Estou implementando um sistema que recebe dados em tempo real de um serviço AMQP e preciso persistir esses dados em um banco de dados MySQL a medida que recebo as mensagens. Acontece que o volume de dados enviado e a frequência dos envio dos dados é muito grande. Por isso não estou conseguindo salvar no banco a medida que as mensagens chegam.
Gostaria de saber se existe alguma forma de implementar uma Queue de threads para a medida que fosse recebendo as mensagens do servidor AMQP, esta queue gerenciasse a entrada de Threads onde cada uma execute os batchs de sql (via Hibernate).
Se você estiver utilizando Java EE, pode usar a api de messageria JMS.
Você pode enfileirar as requisições em uma fila de mensagens, e processa-las de forma assíncrona em consumidores dessa fila. Dessa forma, você pode estabelecer uma política de consumo como achar melhor.
Exemplo:
Digamos que o sistema recebeu 10.000 mensagens (e continua recebendo). Essa mensagens estão enfileiradas num MOM. O container de aplicação vai começar a instanciar os message-driven beans sob demanda para consumir mensagens dessa fila, cada um na sua thread. Dado que cada mensagem é um dado a ser persistido, você pode fazer algo com essa cara:
publicvoidconsumir(Filafila){List<Mensagem>mensagensconsumidas=fila.consumirOuTimeout(100,10segundos);// consome 100 mensagens ou dá timeout em 10 segundos e retorna todas as que puderam ser consumidas// converte cada mensagem em uma entidaderepositorio.persistir(entidades);}
R
renatosousafilho
No caso é um projeto Java puro, sem interface web, apenas para consumo destas mensagens. Eu já possuo um sdk que me repassa as mensagens, mas para implementar a fila de mensagens para persistir os dados no banco que é minha real dificuldade no momento.
Já li alguns posts sobre ThreadPool.
L
lvbarbosa
Então, onde é o gargalo? Cada vez que chega uma mensagem é criada uma nova Thread? Ou é o fato de que vários comandos SQL pequenos estão sendo emitidos e você quer fazer em batch?
R
renatosousafilho
Considerando esta classe que acumula uma lista de clientes.
publicclassCustomersManager{privateLoggerlogger=LoggerFactory.getLogger("customer_manager");privateList<Customer>customers;publicCustomersManager(){customers=newArrayList<Customer>();}publicvoidaddCustomer(Customercustomer){customers.add(customer);}publicvoidperform(){logger.info("Amount of new customers"+customers.size());Sessionsession=newGenericDAO().getSession();session.beginTransaction();inti=0;for(Customercustomer:customers){logger.info("Saving customer: "+customer.getName());session.save(customer);if(i%20==0){//20, same as the JDBC batch size//flush a batch of inserts and release memory:session.flush();session.clear();}i++;}session.getTransaction().commit();session.close();}}
Eu teria esta classe que simula a criação simultanea de 10 lotees de clientes e salvaria cada lote usando a classe anterior.
publicclassCustomThreadPoolExecutor{privateRejectedExecutionHandlerImplrejectionHandler;privateThreadPoolExecutorexecutorPool;privateLoggerlogger=LoggerFactory.getLogger(CustomThreadPoolExecutor.class.getName());publicCustomThreadPoolExecutor(){rejectionHandler=newRejectedExecutionHandlerImpl();executorPool=newThreadPoolExecutor(2,4,10,TimeUnit.SECONDS,newArrayBlockingQueue<Runnable>(10),rejectionHandler);}publicvoidperform(CustomersManagercustomersManager)throwsInterruptedException{executorPool.execute(newRunnable(){@Overridepublicvoidrun(){logger.info("executando save para customer manager");customersManager.perform();try{Thread.sleep(30000);}catch(InterruptedExceptione){e.printStackTrace();}}});}}
e por fim a classe Main com a simução
publicstaticvoidmain(String[]args)throwsInterruptedException{CustomThreadPoolExecutorpoolExecutor=newCustomThreadPoolExecutor();for(inti=0;i<100;i++){CustomersManagercm=newCustomersManager();for(intj=0;j<10000;j++){Customercustomer=newCustomer();customer.setName("Customer #"+j);cm.addCustomer(customer);}poolExecutor.perform(cm);}System.out.println("finisah all tasks");}