How to subscribe to multiple Google PubSub Projects in Spring GCP?
Asked Answered
I

2

8

I want to subscribe to multiple Google Cloud PubSub projects in a Spring Boot application. After reading the related questions in How to wire/configure two pubsub gcp projects in one spring boot application with spring cloud?, How to use Spring Cloud GCP for multiple google projects and https://github.com/spring-cloud/spring-cloud-gcp/issues/1639 I tried it as following. However, since there is no proper documentation or sample code for this, I am not clear about how to implement this. I get the below given error which seems to be caused because credentials are not loaded.

  • What is the proper way to implementation this?
  • How can I load credentials of different projects for configuring each InputChannel?
  • Can I have beans for different project Ids in the same Config file as following?
  • Do I need different properties files for each project Id?

PubSubConfig

Configurations for second PubSub project has been commented.

    package com.dialog.chatboard.config;

    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
    import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate;
    import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
    import org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.messaging.MessageChannel;

    @Configuration
    public class PubSubConfig {

        DefaultSubscriberFactory genieFactory = new DefaultSubscriberFactory(() -> "XXXXX-projectId-01");
        PubSubSubscriberTemplate genieSubscriberTemplate = new PubSubSubscriberTemplate(genieFactory);


//        DefaultSubscriberFactory retailHubFactory = new DefaultSubscriberFactory(() -> "projectId-02");
//        PubSubSubscriberTemplate retailHubSubscriberTemplate = new PubSubSubscriberTemplate(retailHubFactory);



        @Bean
        public MessageChannel genieInputChannel() {
            return new DirectChannel();
        }

        @Bean
        public PubSubInboundChannelAdapter genieChannelAdapter(
                @Qualifier("genieInputChannel") MessageChannel inputChannel) {
            PubSubInboundChannelAdapter adapter =
                    new PubSubInboundChannelAdapter(genieSubscriberTemplate, "agent-genie-sub");
            adapter.setOutputChannel(inputChannel);

            return adapter;
        }

//        @Bean
//        public MessageChannel retailHubInputChannel() {
//            return new DirectChannel();
//        }
//
//        @Bean
//        public PubSubInboundChannelAdapter retailHubChannelAdapter(
//                @Qualifier("retailHubInputChannel") MessageChannel inputChannel) {
//            PubSubInboundChannelAdapter adapter =
//                    new PubSubInboundChannelAdapter(retailHubSubscriberTemplate, "retail-hub-sub");
//            adapter.setOutputChannel(inputChannel);
//
//            return adapter;
//        }

    }

application.properties (For one ProjectId)

spring.cloud.gcp.project-id=XXXXX-projectId-01
spring.cloud.gcp.credentials.location=file:/home/XXXXXXXX/DialogFlow/XXXXXXXXXXXXX.json

Error

I have set GOOGLE_APPLICATION_CREDENTIALS for XXXXXXX-projectId-01 in Linux environment variable.

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'pubSubConfig' defined in file [/home/kabilesh/IdeaProjects/chatboard/target/classes/com/dialog/chatboard/config/PubSubConfig.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442]: Constructor threw exception; nested exception is java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1320) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1214) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:882) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at com.dialog.chatboard.ChatboardApplication.main(ChatboardApplication.java:28) [classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442]: Constructor threw exception; nested exception is java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:217) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:87) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1312) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 17 common frames omitted
Caused by: java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory.createSubscriberStub(DefaultSubscriberFactory.java:277) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.<init>(PubSubSubscriberTemplate.java:100) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at com.dialog.chatboard.config.PubSubConfig.<init>(PubSubConfig.java:19) ~[classes/:na]
at com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442.<init>(<generated>) ~[classes/:na]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_212]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_212]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_212]
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:204) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 19 common frames omitted
Caused by: java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:134) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:119) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:91) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.api.gax.core.GoogleCredentialsProvider.getCredentials(GoogleCredentialsProvider.java:67) ~[gax-1.54.0.jar:1.54.0]
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:135) ~[gax-1.54.0.jar:1.54.0]
at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263) ~[google-cloud-pubsub-1.103.0.jar:1.103.0]
at org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory.createSubscriberStub(DefaultSubscriberFactory.java:274) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
... 27 common frames omitted

Disconnected from the target VM, address: '127.0.0.1:34223', transport: 'socket'

Process finished with exit code 1
Ilana answered 29/4, 2020 at 5:54 Comment(0)
P
11

In order to do that you need

first of all turn off GCP autoconfiguration for pubsub

@SpringBootApplication(exclude = {
        GcpPubSubAutoConfiguration.class,
        GcpPubSubReactiveAutoConfiguration.class
})
public class PubsubApplication {

    public static void main(String[] args) {
        SpringApplication.run(PubsubApplication.class, args);
    }

}

then create config for first project


@Configuration
public class Project1Config {

  private static final Logger LOGGER = LoggerFactory.getLogger(Project1Config.class);

  @Bean(name = "project1_IdProvider")
  public GcpProjectIdProvider project1_IdProvider() {
    return new DefaultGcpProjectIdProvider() {
      @Override
      public String getProjectId() {
        return "YOURPROJECTID";
      }
    };
  }

  @Bean(name = "project1_credentialsProvider")
  public CredentialsProvider project1_credentialsProvider() throws IOException {
    return new CredentialsProvider() {
      @Override
      public Credentials getCredentials() throws IOException {
        return ServiceAccountCredentials.fromStream(
            new ClassPathResource("YOURCREDENTIALS").getInputStream());
      }
    };
  }

  @Bean("project1_pubSubSubscriberTemplate")
  public PubSubSubscriberTemplate pubSubSubscriberTemplate(
          @Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory) {
    return new PubSubSubscriberTemplate(subscriberFactory);
  }


  @Bean("project1_publisherFactory")
  public DefaultPublisherFactory publisherFactory(
          @Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
    defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
    return defaultPublisherFactory;
  }

  @Bean("project1_subscriberFactory")
  public DefaultSubscriberFactory subscriberFactory(
          @Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
    defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
    return defaultSubscriberFactory;
  }

  @Bean(name = "project1_pubsubInputChannel")
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean(name = "project1_pubSubTemplate")
  public PubSubTemplate project1_PubSubTemplate(
      @Qualifier("project1_publisherFactory") PublisherFactory publisherFactory,
      @Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory,
      @Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
    if (publisherFactory instanceof DefaultPublisherFactory) {
      ((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
    }
    return new PubSubTemplate(publisherFactory, subscriberFactory);
  }

  @Bean(name = "project1_messageChannelAdapter")
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("project1_pubsubInputChannel") MessageChannel inputChannel,
      @Qualifier("project1_pubSubTemplate") PubSubTemplate pubSubTemplate) {

    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "YOURSUBSCRIPTIONNAME");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
  }

  @Bean("project1_messageReceiver")
  @ServiceActivator(inputChannel = "project1_pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
      LOGGER.info("Message headers {}", message.getHeaders());
      BasicAcknowledgeablePubsubMessage originalMessage =
          message
              .getHeaders()
              .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
      originalMessage.ack();
    };
  }

  @Bean("project1_messageSender")
  @ServiceActivator(inputChannel = "project1_pubsubOutputChannel")
  public MessageHandler messageSender(
          @Qualifier("project1_pubSubTemplate") PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "YOURTOPICNAME");
  }
}

Next - create config for project2


@Configuration
public class Project2Config {

  private static final Logger LOGGER = LoggerFactory.getLogger(Project2Config.class);

  @Bean(name = "project2_IdProvider")
  public DefaultGcpProjectIdProvider project2_IdProvider() {
    return new DefaultGcpProjectIdProvider() {
      @Override
      public String getProjectId() {
        return "project-id-lksjfkalsdjfkl";
      }
    };
  }

  @Bean(name = "project2_credentialsProvider")
  public CredentialsProvider project2_credentialsProvider() throws IOException {
    return new CredentialsProvider() {
      @Override
      public Credentials getCredentials() throws IOException {
        return ServiceAccountCredentials.fromStream(
            new ClassPathResource("project2.json").getInputStream());
      }
    };
  }

  @Bean("project2_pubSubSubscriberTemplate")
  public PubSubSubscriberTemplate pubSubSubscriberTemplate(
          @Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory) {
    return new PubSubSubscriberTemplate(subscriberFactory);
  }

  @Bean("project2_publisherFactory")
  public DefaultPublisherFactory publisherFactory(
          @Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
    defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
    return defaultPublisherFactory;
  }

  @Bean("project2_subscriberFactory")
  public DefaultSubscriberFactory subscriberFactory(
          @Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
          @Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
    final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
    defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
    return defaultSubscriberFactory;
  }

  @Bean(name = "project2_pubsubInputChannel")
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean(name = "project2_pubSubTemplate")
  public PubSubTemplate project2_PubSubTemplate(
      @Qualifier("project2_publisherFactory") PublisherFactory publisherFactory,
      @Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory,
      @Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
    if (publisherFactory instanceof DefaultPublisherFactory) {
      ((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
    }
    return new PubSubTemplate(publisherFactory, subscriberFactory);
  }

  @Bean(name = "project2_messageChannelAdapter")
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("project2_pubsubInputChannel") MessageChannel inputChannel,
      @Qualifier("project2_pubSubTemplate") PubSubTemplate pubSubTemplate) {

    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "project2-testSubscription");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
  }

  @Bean("project2_messageReceiver")
  @ServiceActivator(inputChannel = "project2_pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      LOGGER.info("Message Payload: " + new String((byte[]) message.getPayload()));
      LOGGER.info("Message headers {}", message.getHeaders());
      BasicAcknowledgeablePubsubMessage originalMessage =
          message
              .getHeaders()
              .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
      originalMessage.ack();
    };
  }

  @Bean(name = "project2_messageSender")
  @ServiceActivator(inputChannel = "project2_pubsubOutputChannel")
  public MessageHandler messageSender(
          @Qualifier("project2_pubSubTemplate") PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "project2-testTopic");
  }
}

Create outbound gateway for project 1

project1_pubsubOutputChannel - specified in Project1Config

@Service
@MessagingGateway(defaultRequestChannel = "project1_pubsubOutputChannel")
public interface Project1PubsubOutboundGateway {

  void sendToPubsub(String text);
}

Create outbound gateway for project 2

project2_pubsubOutputChannel - specified in Project2Config

@Service
@MessagingGateway(defaultRequestChannel = "project2_pubsubOutputChannel")
public interface Project2PubsubOutboundGateway {

  void sendToPubsub(String text);
}

Now we are successfull:


@RestController
public class WebAppController {

  // tag::autowireGateway[]
  @Autowired private Project1PubsubOutboundGateway project1PubsubOutboundGateway;
  @Autowired private Project2PubsubOutboundGateway project2PubsubOutboundGateway;
  // end::autowireGateway[]

  @PostMapping("/publishMessage")
  public ResponseEntity<String> publishMessage(@RequestParam("message") String message) {
    project1PubsubOutboundGateway.sendToPubsub(message);
    project2PubsubOutboundGateway.sendToPubsub(message);
    return ResponseEntity.ok("OK");
  }
}

Check logs to see messaging is working

checkout git project for more details: https://github.com/olgmaks/spring-gcppubsub-multiproject

Paule answered 9/5, 2020 at 22:8 Comment(2)
That was really helpful. Thanks a lot.Ilana
Hi, nice proposal of such non-standard configuration but I guess it is just a kind of PoC, right? From what I see in the original autoconfiguration the beans and their relationships between them are not so straightforward as in you proposal... Should not we analyse in details what original autoconfiguration really does and try to "reproduce" such configuration in our "custom" approach instead of just creating Default-* beans e.g. DefaultSubscriberFactory ?? Did you use such multi-config approach somewhere on production?Fascicle
J
0

I have the same type of requirement that I want to fetch browser data from another GCP Project and App data from the Different project.

Prerequisite to run this is that you need one serviceAccount which should be having access to fetch data from both the project.

Assume defaultProjectId is project1 and insightsProjectId is project2 in the application.properties which we will fetch values using @Value annotation.

This worked for me in the Pubsubconfig.java class which is a configuration bean for pubsub and I am able to read multiple subscriptions from the two different projects

Below is the code for that

PubSubSubscriberTemplate returnDefaultProject() {
    DefaultSubscriberFactory defaultFactory = new DefaultSubscriberFactory(() -> defaultProjectId);
    return new PubSubSubscriberTemplate(defaultFactory);
}

PubSubSubscriberTemplate returnInsightsProject() {
    DefaultSubscriberFactory insightsFactory = new DefaultSubscriberFactory(() -> insightsProjectId);
    return new PubSubSubscriberTemplate(insightsFactory);
}

@Bean(name = "browserChannelAdapter")
public PubSubInboundChannelAdapter browserChannelAdapter(
        @Qualifier("browserInputChannel") MessageChannel inputChannel) {
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(returnInsightsProject(), brSubscriptionId);
    adapter.setOutputChannel(inputChannel);

    return adapter;
}

@Bean(name = "appChannelAdapter")
public PubSubInboundChannelAdapter appChannelAdapter(
        @Qualifier("appInputChannel") MessageChannel inputChannel) {
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(returnDefaultProject(), appSubscriptionId);
    adapter.setOutputChannel(inputChannel);

    return adapter;
}
Jaynes answered 15/5, 2020 at 13:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.