RabbitMQ Spring Boot Session Error

2019-05-23 09:37发布

问题:

I am running into an issue when trying to pass a RabbitMQ queue message to my Spring Boot application. I am running spring data 4 and am using Neo4j as my database. I am able to recieve and parse a JSON string from the RabbitMQ queue to the spring boot app. Here is the thread change for the receiver process:

14:19:17.811 [main] INFO o.s.b.c.e.t.TomcatEmbeddedServletContainer - Tomcat started on port(s): 8080 (http) 14:19:17.813 [main] INFO org.directoryx.Application - Started Application in 4.989 seconds (JVM running for 5.47) 14:19:26.708 [SimpleAsyncTaskExecutor-1] INFO org.directoryx.sync.Receiver - Message Recv'd! {"lastName":"Okuneva", "country":"United States", "city":"New Stantonfort","org":"IT","photo":"profile9.jpg", "active":"true", "managerId":"1502", "title":"Global Integration Orchestrator", "userId":"frokune", "firstName":"Frank", "phone":"(485)544-1782 x913", "id":"6791", "email":"frokune@peoplemaker.com","status":"Employee"} 14:19:26.718 [SimpleAsyncTaskExecutor-1] INFO org.directoryx.sync.Receiver - Update Person: Frank Okuneva

but when I try to save the incoming JSON into a Neo4j node, the application crashes with the following error:

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'receiver': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.data.neo4j.template.Neo4jTemplate

Before implementing the neo4jTemplate I was getting this error when I tried to save the person through my personService->personServiceImpl->personRepository pattern.

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.getSession': Scope 'session' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

Here are the three classes involved:

Receiver.java

@Service
public class Receiver implements MessageListener {

static Logger log = LoggerFactory.getLogger(Receiver.class);

// Fails to be injected...
// Injection of autowired dependencies failed; nested exception is
// org.springframework.beans.factory.BeanCreationException: Could not autowire field:
@Autowired
private Neo4jTemplate neo4jTemplate;

public void onMessage(Message message) {
    Gson gson  = new GsonBuilder().create();
    String msg = new String(message.getBody());
    log.info("Message Recv'd! {}",msg);

    Person person = gson.fromJson(msg, Person.class);
    log.info("Update Person: {} {}", person.getFirstName(), person.getLastName());

    try {
        neo4jTemplate.save(person);
    } catch (Exception e) {
        log.error("Receiver failed to save Person: {}", e);
    }
}

DataSyncConfig.java

@Configuration
public class DataSyncConfig {


@Autowired
RabbitTemplate rabbitTemplate;
private final String QUEUE_NAME = "sync2";
private final String ROUTING_KEY = "routeme";
private final boolean DURABLE = true;
private final boolean EXCLUSIVE = true;
private final boolean AUTO_DELETE = true;

@Bean
Queue queue() {
    return new Queue(QUEUE_NAME, DURABLE, !EXCLUSIVE, !AUTO_DELETE);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("etl-sync-tool");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}

@Bean
public MessageConverter jsonMessageConverter() {
    JsonMessageConverter jsonMessageConverter = new JsonMessageConverter();
    return jsonMessageConverter;
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(QUEUE_NAME);
    container.setMessageListener(listenerAdapter);
    return container;
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
    return new MessageListenerAdapter(receiver);
}
}

Neo4jConfig.java

@Configuration
@EnableNeo4jRepositories(basePackages = "org.directoryx.repository", queryLookupStrategy = QueryLookupStrategy.Key.CREATE_IF_NOT_FOUND)
@EnableTransactionManagement
public class Neo4jConfig extends Neo4jConfiguration {

private final Logger log = LoggerFactory.getLogger(Neo4jConfig.class);

@Resource
public Environment env;

@Override
@Bean
public Neo4jServer neo4jServer() {
    log.info("Initialising Neo4j server connection...");

    // username and password need to be available as System properties for Neo4j authentication.
    String user = env.getProperty("neo4j.user");
    System.setProperty("username", user);

    String pass = env.getProperty("neo4j.pass");
    System.setProperty("password", pass);

    log.info("connecting as neo4j.user=" + user);
    return new RemoteServer("http://localhost:7474");
}

@Override
@Bean
public SessionFactory getSessionFactory() {
    log.info("Initialising Session Factory");
    return new SessionFactory("org.directoryx.domain");
}

@Override
@Bean
// Unsure of scope value, tried both "session" and "global"
@Scope(value = "session", proxyMode = ScopedProxyMode.TARGET_CLASS)
public Session getSession() throws Exception {
    log.info("Initialising session-scoped Session Bean");
    return super.getSession();
}

@Bean
public Neo4jTemplate neo4jTemplate (Session session ) {
    return new Neo4jTemplate(session);
}
}

I can't seem to find out how to save the data to the db either by exposing and using the current thread or by creating a new thread. I feel that this is an issue that shouldn't be far off from some of the core functionality of Spring Boot and Spring Data, but I cant seem to determine what syntax to use even after extensive research.

回答1:

The error message suggests that running with a session scope isn't appropriate in your usage context, probably because you're in a RabbitMQ context as opposed to a Spring Web/MVC one.

I'd advise removing the @Scope annotation altogether from your getSession method in Neo4jConfig and see if that improves things. You can always add a user-managed scope later if you feel it's necessary to do so.

In addition, you may want to consider coding against Neo4jOperations instead of Neo4jTemplate. Some users have reported problems with injecting the class instead of the interface.



回答2:

I removed the @Scope annotation block and retained the overridden SessionFactory.

Neoj4Config.java

@Configuration
@EnableNeo4jRepositories(basePackages = "org.directoryx.repository",   queryLookupStrategy = QueryLookupStrategy.Key.CREATE_IF_NOT_FOUND)
@EnableTransactionManagement
public class Neo4jConfig extends Neo4jConfiguration {

private final Logger log = LoggerFactory.getLogger(Neo4jConfig.class);

@Resource
public Environment env;

@Override
@Bean
public Neo4jServer neo4jServer() {
    log.info("Initialising Neo4j server connection...");

    // username and password need to be available as System properties for Neo4j authentication.
    String user = env.getProperty("neo4j.user");
    System.setProperty("username", user);

    String pass = env.getProperty("neo4j.pass");
    System.setProperty("password", pass);

    log.info("connecting as neo4j.user=" + user);
    return new RemoteServer("http://localhost:7474");
}

@Override
@Bean
public SessionFactory getSessionFactory() {
    log.info("Initialising Session Factory");
    return new SessionFactory("org.directoryx.domain");
}
}