Apache flink 已调用自定义Flink接收器,但没有数据

Apache flink 已调用自定义Flink接收器,但没有数据,apache-flink,flink-streaming,Apache Flink,Flink Streaming,我打算实现一个自定义接收器,在这里我创建了一个存根调用函数,它只将接收到的数据记录到任务日志文件(目前),如下所示 package io.name.package; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.LoggerFactory; import org.slf4j.Logger; public class AlertSink extends RichSi

我打算实现一个自定义接收器,在这里我创建了一个存根调用函数,它只将接收到的数据记录到任务日志文件(目前),如下所示

package io.name.package;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class AlertSink extends RichSinkFunction<Alert> {
    Logger LOG = LoggerFactory.getLogger(AlertSink.class);

    @Override
    public void invoke(Alert alert, Context context) throws Exception {
        LOG.info("Invoking sink for alert: ", alert.toString());
    }
}
        DataStream<Alert> result = filteredMetrics
            .keyBy(
                new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
                    @Override
                    public  Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
                        return Tuple3.of(in.f0, in.f1, in.f2);
                    }
            })
            .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .process(new ThresholdEvaluator());

        result.addSink(new AlertSink());
我是不是遗漏了什么

我还尝试在ThresholdEvaluator和addSink操作符之间添加映射函数。MapFunction似乎能够很好地接收警报对象,但不能接收警报接收器

        result.map(new MapFunction<Alert, Alert>() {
            @Override
            public Alert map(Alert value) {
                LOG.info(value.toString());
                return value;
            }
        }).addSink(new AlertSink());
result.map(新的MapFunction(){
@凌驾
公共警报映射(警报值){
LOG.info(value.toString());
返回值;
}
}).addSink(新的AlertSink());

(使用附加日志更新)

原因是日志输出没有用于插入值的占位符-正确的语法是

LOG.info("Invoking sink for alert: {}", alert.toString());
这和

LOG.info("Invoking sink for alert: " + alert.toString());

在后一种情况下,无论日志级别如何,每次都会发生字符串连接,在第一种情况下,如果日志级别至少为
INFO

,而不是
result.addSink(new AlertSink())
使用
result.print()
,则会插入该值,是否得到非空的结果?我想知道问题是否出在其他地方,例如
ThresholdEvaluator
。我没有尝试result.print(),但在ThresholdEvaluator发出输出之前,我也会记录输出警报,如下所示<代码>2020-08-05 19:38:16646信息io.name.package.ThresholdEvaluator-警报:{“thresholdID”:“123123123”,“分组”:“svc-integrations-14361530-demo-slack-token”,“period”:“5m”,“isActive”:true,“status”:“new”,“firstSeen”:1596656296645,“lastSeen”:1596656296645,“count”:1}我已经用ThresholdEvaluator日志更新了这个问题,哪些日志发出了输出(警报)。我看不出AlertSink有任何问题,所以我仍然怀疑
阈值计算器
。真奇怪。我尝试在ThresholdEvaluator和addSink操作符之间插入MapFunction。MapFunction能够很好地接收警报对象,但仍然不能接收警报接收器。
LOG.info("Invoking sink for alert: " + alert.toString());