为了账号安全,请及时绑定邮箱和手机立即绑定

报错不知道怎么解决

https://img1.sycdn.imooc.com//5cea8f920001a37708400151.jpg

这个报错该怎么解决

正在回答

2 回答

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
        .keyBy(new KeySelector<WikipediaEditEvent, String>() {
            @Override
            public String getKey(WikipediaEditEvent event) {
                return event.getUser();
            }
        });


0 回复 有任何疑惑可以回复我~
package org.myorg.quickstart;


import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

/**
 *  并行度,第n个子任务> (用户,改动的字节数)
 * 4> (Artegia,3)
 */
public class WikipediaAnalysis {
    public static void main(String[] args) throws Exception{

        // 创建一个Streaming程序运行的上下文
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // source部分--数据来源部分
        DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

        // 统计key
        KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
                .keyBy((KeySelector<WikipediaEditEvent, String>) event -> {
                    return event.getUser();
                });
        // 窗口
        DataStream<Tuple2<String, Long>> result = keyedEdits
                // 每5秒
                .timeWindow(Time.seconds(5))
                // 指定一个初识值
                .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) throws Exception {
                        acc.f0 = event.getUser();
                        acc.f1 += event.getByteDiff();
                        return acc;
                    }
                });
        result.print();
        env.execute();

    }
}


0 回复 有任何疑惑可以回复我~

举报

0/150
提交
取消

报错不知道怎么解决

我要回答 关注问题
意见反馈 帮助中心 APP下载
官方微信