2019独角兽企业重金招聘Python工程师标准>>>
此篇主要记录spring-cloud stream,对rabbitmq的安装使用不做累述。
- 创建stream-receiver作为消费者
pom.xml:cloud-stream com.cherrish 1.0-SNAPSHOT 4.0.0 stream-receiver org.springframework.cloud spring-cloud-stream-binder-rabbit application.properties:
spring.application.name=customer
server.port=7889spring.rabbitmq.host=192.168.1.17
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=adminspring.cloud.stream.bindings.input.destination=sink-channel
spring.cloud.stream.bindings.output.destination=sink-channel#不指定该输出通道无法接收消息Java 代码:
/**********************************CustomerApp.java*********************************/
package com.cherrish;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author cherrishccl* @time 2018-08-31 14:53* @name CustomerApp* @desc:*/
@SpringBootApplication
public class CustomerApp {private static Logger logger = LoggerFactory.getLogger(CustomerApp.class);public static void main(String[] args) {SpringApplication.run(CustomerApp.class, args);}}
/**********************************SinkReceiver.java*********************************/
package com.cherrish;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;import java.util.concurrent.atomic.AtomicInteger;/*** @author cherrish* @time 2018-08-31 14:39* @name SinkReceiver* @desc:*/
@EnableBinding(value = Sink.class)
public class SinkReceiver {private static final AtomicInteger NUM = new AtomicInteger(0);private static Logger log = LoggerFactory.getLogger(SinkReceiver.class);@StreamListener(Sink.INPUT)public void receive(Messagepayload) {log.info(NUM.getAndIncrement() + " Received : " + payload.getPayload());}
} -
创建stream-sender作为生产者
pom.xml:
cloud-stream com.cherrish 1.0-SNAPSHOT 4.0.0 stream-sender org.springframework.cloud spring-cloud-stream-binder-rabbit application.properties:
spring.application.name=producer
server.port=7888spring.rabbitmq.host=192.168.1.195
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=adminspring.cloud.stream.bindings.input.destination=sink-channel
spring.cloud.stream.bindings.output.destination=sink-channel
Java代码:/**********************************ProducerApp.java*********************************/
package com.cherrish;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;/*** @author cherrish* @time 2018-08-31 14:52* @name ProducerApp* @desc:*/
@EnableScheduling
@SpringBootApplication
public class ProducerApp {private static Logger logger = LoggerFactory.getLogger(ProducerApp.class);public static void main(String[] args) {SpringApplication.run(ProducerApp.class, args);}
}
/**********************************SinkSender.java*********************************/
package com.cherrish;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;/*** @author cherrish* @time 2018-08-30 16:16* @name SinkSender* @desc:*/
@EnableBinding(value = {Source.class})
public class SinkSender {private static Logger log = LoggerFactory.getLogger(SinkSender.class);private final static AtomicInteger NUM = new AtomicInteger(0);@InboundChannelAdapter(value = Source.OUTPUT,poller = @Poller(fixedRate = "3000"))public String timerMessageSource() {String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());log.info(NUM.getAndIncrement() + " Send message : " + format);return format;}@AutowiredSource source;public void send(String message){source.output().send(org.springframework.integration.support.MessageBuilder.withPayload(message).build());}
}
/**********************************ScheduleTimer.java*********************************/package com.cherrish;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.Date;/*** @author cherrish* @time 2018-09-03 10:58* @name ScheduleTimer* @desc:*/
@Component
public class ScheduleTimer {@AutowiredSinkSender sender;@Scheduled(fixedRate = 3000)public void send(){sender.send("定时调用发送消息" + new Date());}
}---------------------------
父pom.xml:
4.0.0 com.cherrish 1.0-SNAPSHOT cloud-stream pom stream-sender stream-receiver Finchley.RELEASE UTF-8 1.8 1.8 1.8 2.0.4.RELEASE true org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import