如题,在处理海量数据的时候,生产者生产数据的速度远大于消费者,故为了平衡处理能力,增加多个消费者进行处理,但是这里有个问题就是,如果这批数据一旦有那个消费者处理出现问题,其他消费者要停止处理,等待下一批数据,有什么办法能做到吗?
3 回答
小唯快跑啊
TA贡献1863条经验 获得超2个赞
生产者类DataProducer
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.BlockingQueue;
public class DataProducer implements Observer, Runnable {
private BlockingQueue<String> blockingQueue;
private boolean handling = true;
public DataProducer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (handling) {
try
{
String str = handleData();
blockingQueue.offer(str);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private String handleData() {
return null;
}
@Override
public void update(Observable o, Object arg) {
if (arg.toString().equals("stopHandling")) {
System.out.println("stopHandling data");
handling = false;
blockingQueue.clear();
}
}
}消费者类DataConsumer
import java.util.Observable;
import java.util.concurrent.BlockingQueue;
public class DataConsumer extends Observable implements Runnable {
private BlockingQueue<String> blockingQueue;
public DataConsumer(BlockingQueue<String> blockingDeque) {
this.blockingQueue = blockingDeque;
}
@Override
public void run() {
while (true) {
try {
String str = blockingQueue.take();
handleData(str);
} catch (Exception e) {
notifyObservers("stopHandling");
}
}
}
private void handleData(String str) {
}
@Override
public void notifyObservers(Object arg) {
super.setChanged();
super.notifyObservers(arg);
}
}主线程启动任务,测试程序
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
public class TestData {
private static final ExecutorService executor = Executors.newFixedThreadPool(5);
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedTransferQueue<>();
DataProducer dataProducer = new DataProducer(blockingQueue);
executor.execute(dataProducer);
for (int i = 0; i < 4; i++) {
executor.execute(new DataConsumer(blockingQueue));
}
}
}
温温酱
TA贡献1752条经验 获得超4个赞
在传递给消费者的参数里加上状态,一但某个消费者处理出问题,就修改其状态标记问题,其它消费者检查到问题标记之后就不继续处理了(但是已经在处理过程中的仍然会继续,除非过程中多次检查状态)
添加回答
举报
0/150
提交
取消
