如何在Spring GCP中订阅多个Google PubSub项目?

8
我想在Spring Boot应用程序中订阅多个Google Cloud PubSub项目。在阅读了相关问题(如何使用Spring Cloud配置两个pubsub gcp项目在一个spring boot应用程序中)(如何为多个Google项目使用Spring Cloud GCP)(https://github.com/spring-cloud/spring-cloud-gcp/issues/1639)后,我按照以下步骤尝试实现。然而,由于缺乏适当的文档或示例代码,我不清楚如何实现这一点。我收到以下错误消息,可能是因为未加载凭据。

  • 实现此操作的正确方式是什么?
  • 如何加载不同项目的凭据以配置每个InputChannel?
  • 我可以在同一Config文件中为不同的项目ID拥有bean吗?
  • 我需要为每个项目ID创建不同的属性文件吗?

PubSubConfig

第二个PubSub项目的配置已被注释掉。

    package com.dialog.chatboard.config;

    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
    import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate;
    import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
    import org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.messaging.MessageChannel;

    @Configuration
    public class PubSubConfig {

        DefaultSubscriberFactory genieFactory = new DefaultSubscriberFactory(() -> "XXXXX-projectId-01");
        PubSubSubscriberTemplate genieSubscriberTemplate = new PubSubSubscriberTemplate(genieFactory);


//        DefaultSubscriberFactory retailHubFactory = new DefaultSubscriberFactory(() -> "projectId-02");
//        PubSubSubscriberTemplate retailHubSubscriberTemplate = new PubSubSubscriberTemplate(retailHubFactory);



        @Bean
        public MessageChannel genieInputChannel() {
            return new DirectChannel();
        }

        @Bean
        public PubSubInboundChannelAdapter genieChannelAdapter(
                @Qualifier("genieInputChannel") MessageChannel inputChannel) {
            PubSubInboundChannelAdapter adapter =
                    new PubSubInboundChannelAdapter(genieSubscriberTemplate, "agent-genie-sub");
            adapter.setOutputChannel(inputChannel);

            return adapter;
        }

//        @Bean
//        public MessageChannel retailHubInputChannel() {
//            return new DirectChannel();
//        }
//
//        @Bean
//        public PubSubInboundChannelAdapter retailHubChannelAdapter(
//                @Qualifier("retailHubInputChannel") MessageChannel inputChannel) {
//            PubSubInboundChannelAdapter adapter =
//                    new PubSubInboundChannelAdapter(retailHubSubscriberTemplate, "retail-hub-sub");
//            adapter.setOutputChannel(inputChannel);
//
//            return adapter;
//        }

    }

application.properties(针对一个ProjectId)

spring.cloud.gcp.project-id=XXXXX-projectId-01
spring.cloud.gcp.credentials.location=file:/home/XXXXXXXX/DialogFlow/XXXXXXXXXXXXX.json

错误

我已在Linux环境变量中设置了 GOOGLE_APPLICATION_CREDENTIALSXXXXXXX-projectId-01

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'pubSubConfig' defined in file [/home/kabilesh/IdeaProjects/chatboard/target/classes/com/dialog/chatboard/config/PubSubConfig.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442]: Constructor threw exception; nested exception is java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1320) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1214) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:882) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at com.dialog.chatboard.ChatboardApplication.main(ChatboardApplication.java:28) [classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442]: Constructor threw exception; nested exception is java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:217) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:87) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1312) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 17 common frames omitted
Caused by: java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory.createSubscriberStub(DefaultSubscriberFactory.java:277) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.<init>(PubSubSubscriberTemplate.java:100) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at com.dialog.chatboard.config.PubSubConfig.<init>(PubSubConfig.java:19) ~[classes/:na]
at com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442.<init>(<generated>) ~[classes/:na]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_212]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_212]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_212]
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:204) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 19 common frames omitted
Caused by: java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:134) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:119) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:91) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.api.gax.core.GoogleCredentialsProvider.getCredentials(GoogleCredentialsProvider.java:67) ~[gax-1.54.0.jar:1.54.0]
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:135) ~[gax-1.54.0.jar:1.54.0]
at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263) ~[google-cloud-pubsub-1.103.0.jar:1.103.0]
at org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory.createSubscriberStub(DefaultSubscriberFactory.java:274) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
... 27 common frames omitted

Disconnected from the target VM, address: '127.0.0.1:34223', transport: 'socket'

Process finished with exit code 1
2个回答

11

为此,您需要进行以下操作:

首先关闭GCP pubsub的自动配置

@SpringBootApplication(exclude = {
        GcpPubSubAutoConfiguration.class,
        GcpPubSubReactiveAutoConfiguration.class
})
public class PubsubApplication {

    public static void main(String[] args) {
        SpringApplication.run(PubsubApplication.class, args);
    }

}

接下来为第一个项目创建配置


@Configuration
public class Project1Config {

  private static final Logger LOGGER = LoggerFactory.getLogger(Project1Config.class);

  @Bean(name = "project1_IdProvider")
  public GcpProjectIdProvider project1_IdProvider() {
    return new DefaultGcpProjectIdProvider() {
      @Override
      public String getProjectId() {
        return "YOURPROJECTID";
      }
    };
  }

  @Bean(name = "project1_credentialsProvider")
  public CredentialsProvider project1_credentialsProvider() throws IOException {
    return new CredentialsProvider() {
      @Override
      public Credentials getCredentials() throws IOException {
        return ServiceAccountCredentials.fromStream(
            new ClassPathResource("YOURCREDENTIALS").getInputStream());
      }
    };
  }

  @Bean("project1_pubSubSubscriberTemplate")
  public PubSubSubscriberTemplate pubSubSubscriberTemplate(
          @Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory) {
    return new PubSubSubscriberTemplate(subscriberFactory);
  }


  @Bean("project1_publisherFactory")
  public DefaultPublisherFactory publisherFactory(
          @Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
    defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
    return defaultPublisherFactory;
  }

  @Bean("project1_subscriberFactory")
  public DefaultSubscriberFactory subscriberFactory(
          @Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
    defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
    return defaultSubscriberFactory;
  }

  @Bean(name = "project1_pubsubInputChannel")
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean(name = "project1_pubSubTemplate")
  public PubSubTemplate project1_PubSubTemplate(
      @Qualifier("project1_publisherFactory") PublisherFactory publisherFactory,
      @Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory,
      @Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
    if (publisherFactory instanceof DefaultPublisherFactory) {
      ((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
    }
    return new PubSubTemplate(publisherFactory, subscriberFactory);
  }

  @Bean(name = "project1_messageChannelAdapter")
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("project1_pubsubInputChannel") MessageChannel inputChannel,
      @Qualifier("project1_pubSubTemplate") PubSubTemplate pubSubTemplate) {

    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "YOURSUBSCRIPTIONNAME");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
  }

  @Bean("project1_messageReceiver")
  @ServiceActivator(inputChannel = "project1_pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
      LOGGER.info("Message headers {}", message.getHeaders());
      BasicAcknowledgeablePubsubMessage originalMessage =
          message
              .getHeaders()
              .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
      originalMessage.ack();
    };
  }

  @Bean("project1_messageSender")
  @ServiceActivator(inputChannel = "project1_pubsubOutputChannel")
  public MessageHandler messageSender(
          @Qualifier("project1_pubSubTemplate") PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "YOURTOPICNAME");
  }
}

接下来 - 为项目2创建配置


@Configuration
public class Project2Config {

  private static final Logger LOGGER = LoggerFactory.getLogger(Project2Config.class);

  @Bean(name = "project2_IdProvider")
  public DefaultGcpProjectIdProvider project2_IdProvider() {
    return new DefaultGcpProjectIdProvider() {
      @Override
      public String getProjectId() {
        return "project-id-lksjfkalsdjfkl";
      }
    };
  }

  @Bean(name = "project2_credentialsProvider")
  public CredentialsProvider project2_credentialsProvider() throws IOException {
    return new CredentialsProvider() {
      @Override
      public Credentials getCredentials() throws IOException {
        return ServiceAccountCredentials.fromStream(
            new ClassPathResource("project2.json").getInputStream());
      }
    };
  }

  @Bean("project2_pubSubSubscriberTemplate")
  public PubSubSubscriberTemplate pubSubSubscriberTemplate(
          @Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory) {
    return new PubSubSubscriberTemplate(subscriberFactory);
  }

  @Bean("project2_publisherFactory")
  public DefaultPublisherFactory publisherFactory(
          @Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
    defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
    return defaultPublisherFactory;
  }

  @Bean("project2_subscriberFactory")
  public DefaultSubscriberFactory subscriberFactory(
          @Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
    defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
    return defaultSubscriberFactory;
  }

  @Bean(name = "project2_pubsubInputChannel")
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean(name = "project2_pubSubTemplate")
  public PubSubTemplate project2_PubSubTemplate(
      @Qualifier("project2_publisherFactory") PublisherFactory publisherFactory,
      @Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory,
      @Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
    if (publisherFactory instanceof DefaultPublisherFactory) {
      ((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
    }
    return new PubSubTemplate(publisherFactory, subscriberFactory);
  }

  @Bean(name = "project2_messageChannelAdapter")
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("project2_pubsubInputChannel") MessageChannel inputChannel,
      @Qualifier("project2_pubSubTemplate") PubSubTemplate pubSubTemplate) {

    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "project2-testSubscription");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
  }

  @Bean("project2_messageReceiver")
  @ServiceActivator(inputChannel = "project2_pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      LOGGER.info("Message Payload: " + new String((byte[]) message.getPayload()));
      LOGGER.info("Message headers {}", message.getHeaders());
      BasicAcknowledgeablePubsubMessage originalMessage =
          message
              .getHeaders()
              .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
      originalMessage.ack();
    };
  }

  @Bean(name = "project2_messageSender")
  @ServiceActivator(inputChannel = "project2_pubsubOutputChannel")
  public MessageHandler messageSender(
          @Qualifier("project2_pubSubTemplate") PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "project2-testTopic");
  }
}

为项目1创建出站网关

在Project1Config中指定project1_pubsubOutputChannel。

@Service
@MessagingGateway(defaultRequestChannel = "project1_pubsubOutputChannel")
public interface Project1PubsubOutboundGateway {

  void sendToPubsub(String text);
}

为项目2创建出站网关

在Project2Config中指定了project2_pubsubOutputChannel。

@Service
@MessagingGateway(defaultRequestChannel = "project2_pubsubOutputChannel")
public interface Project2PubsubOutboundGateway {

  void sendToPubsub(String text);
}

现在我们已经成功了:


@RestController
public class WebAppController {

  // tag::autowireGateway[]
  @Autowired private Project1PubsubOutboundGateway project1PubsubOutboundGateway;
  @Autowired private Project2PubsubOutboundGateway project2PubsubOutboundGateway;
  // end::autowireGateway[]

  @PostMapping("/publishMessage")
  public ResponseEntity<String> publishMessage(@RequestParam("message") String message) {
    project1PubsubOutboundGateway.sendToPubsub(message);
    project2PubsubOutboundGateway.sendToPubsub(message);
    return ResponseEntity.ok("OK");
  }
}

检查日志以查看消息是否正常工作

有关更多详细信息,请查看Git项目: https://github.com/olgmaks/spring-gcppubsub-multiproject


那真的很有帮助。非常感谢。 - Kabilesh
你好,非常不错的提议,但我猜这只是一种 PoC,对吧?从我在原始自动配置中看到的,bean 以及它们之间的关系并不像你的提议那样直截了当...我们应该详细分析原始自动配置的实际作用,并尝试在我们的“自定义”方法中“复制”这样的配置,而不仅仅是创建 Default-* bean,例如 DefaultSubscriberFactory。你是否在生产环境中使用过这种多配置方法? - radekEm

0

我有一个类似的需求,即从另一个GCP项目中获取浏览器数据和不同项目中的应用程序数据。

运行此操作的先决条件是您需要一个服务帐户,该帐户应具有从两个项目中提取数据的访问权限。

假设在application.properties中,defaultProjectId为project1,insightsProjectId为project2,我们将使用@Value注释获取值。

这对我在Pubsubconfig.java类中起作用,该类是pubsub的配置bean,我能够从两个不同的项目中读取多个订阅。

以下是代码:

PubSubSubscriberTemplate returnDefaultProject() {
    DefaultSubscriberFactory defaultFactory = new DefaultSubscriberFactory(() -> defaultProjectId);
    return new PubSubSubscriberTemplate(defaultFactory);
}

PubSubSubscriberTemplate returnInsightsProject() {
    DefaultSubscriberFactory insightsFactory = new DefaultSubscriberFactory(() -> insightsProjectId);
    return new PubSubSubscriberTemplate(insightsFactory);
}

@Bean(name = "browserChannelAdapter")
public PubSubInboundChannelAdapter browserChannelAdapter(
        @Qualifier("browserInputChannel") MessageChannel inputChannel) {
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(returnInsightsProject(), brSubscriptionId);
    adapter.setOutputChannel(inputChannel);

    return adapter;
}

@Bean(name = "appChannelAdapter")
public PubSubInboundChannelAdapter appChannelAdapter(
        @Qualifier("appInputChannel") MessageChannel inputChannel) {
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(returnDefaultProject(), appSubscriptionId);
    adapter.setOutputChannel(inputChannel);

    return adapter;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接