流处理模型

发送反馈


流数据服务(Streaming Service)以流处理模型作为服务来源,其中指定了运行服务所需的信息。

流数据处理流程包含:

依据上述处理流程,流数据的处理模型包含四个部分:Receiver(接收器)、Filter(过滤器)、Mapper(转换器)和Sender(发送器)。每个部分作为一个节点,可以进行连接和合并,构建成实时数据处理流 Stream。除了处理流 Stream 以外,还有一些辅助参数作为整个服务的运行条件,一并存储在启动参数类型 Startup 中。处理模型如下图:

 配置参数

流处理模型采用 JSON 格式定义,您可以参考下文介绍的参数及相应的 JSON 示例,编写一个流处理模型文件并发布为流数据服务。您也可以使用流处理模型编辑器构建模型,查看参数说明即可。

SparkParameter

用于设置 Spark Streaming 的运行参数。包括:

Stream

Stream 中包含了实时数据处理运行流的参数。

Receiver

继承自StreamNode,作为流数据处理的入口,接收各种来源的数据,包括Socket、WebSocket、Http、文件系统等。Receiver中需要设置接收信息的元数据,即metadata。Receiver节点包括三个部分组成:自身的描述信息如name、source等;消息的元数据metadata;消息的读取格式reader。

流数据服务支持以下接收方式:

SocketReceiver:继承自Receiver,接收Socket消息的节点。需指定的参数有:

示例:

{
   "ipAddress" : "127.0.0.1",
   "port" : 9527, 
   "name" : "socketReceiver", 
   "source" : "Socket Receiver", 
   "description" : "Receive some message from socketServer", 
   "prevNodes" : [],
   "nextNodes" : [],
   "className": "com.supermap.bdt.streaming.receiver.SocketReceiver "
}

MultiSocketReceiver:继承自Receiver,同时接收多个Socket消息的节点,接收的消息内容必须是相同的。需指定的参数有:

示例:

{
  "servers": [
    "192.168.1.1:9527",
    "192.168.1.1:9528",
    "192.168.1.2:9527"
  ],
    "name": "multiSocketReceiver",
    "source": "MultiSource Socket Receiver",
    "description": "Receive message from multi socket server",
    "prevNodes": [],
    "nextNodes": [],
    "className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver"
}

  SocketServerReceiver:继承自Receiver,Socket服务端接收节点,用于作为服务端接收其他Socket客户的发送的消息。需指定的参数有:

{
  "port": 9527,
  "name": "socketServerReceiver",
  "source": "SocketServer Receiver",
  "description": "Receive message from socket client",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver"
}

  WebSocketReceiver:继承自Receiver,接收WebSocket消息的节点。需指定的参数有:

{
  "url": "ws://192.168.1.1:9527/websocket ",
  "name": "webSocketReceiver",
  "source": "WebSocket Receiver",
  "description": "Receive message from websocket server",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver"
}

  TextFileReceiver:继承自Receiver,监控指定目录,读取新增文件的内容。需指定的参数有:

{
  "directoryPath": "'hdfs:///data/'",
  "name": "textFileReceiver",
  "source": "Text File Receiver",
  "description": "Listen new file in folder",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.TextFileReceiver"
}

SingleTextFileReceiver:单文本文件接收器,继承自Receiver,根据设置读取监控文件的内容,支持读取 Json、GeoJSON 和 CSV格式的文件。需指定的参数有:

{
  "version": 9000,
  "sparkParameter": {
    "checkPointDir": "tmp",
    "interval": 5000
  },
  "stream": {
    "nodeDic": {
      "TextFileReceiver": {
        "filePath": "G:\\QQRev\\test.json",
        "readInterva": 1000,
        "rowsOneTime": 100,
        "reader": {
          "isJsonArray": false,
          "arrayExpression": "",
          "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
        },
        "metadata": {
          "title": "",
          "epsg": 3857,
          "fieldInfos": [
            {
              "name": "X",
              "source": "lon",
              "nType": "DOUBLE"
            },
            {
              "name": "Y",
              "source": "lat",
              "nType": "DOUBLE"
            },
            {
              "name": "mbbh",
              "source": "mbbh",
              "nType": "TEXT"
            }
          ],
          "featureType": "POINT",
          "idFieldName": "mbbh",
          "dateTimeFormat": "yyyy-MM-dd HH:mm:ss"
        },
        "name": "TextFileReceiver",
        "caption": "",
        "description": "",
        "prevNodes": [],
        "nextNodes": [
          "ConsoleSender"
        ],
        "className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver"
      },
      "ConsoleSender": {
        "formatter": {
          "separator": ",",
          "className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
        },
        "name": "ConsoleSender",
        "caption": "",
        "description": "",
        "prevNodes": [
          "TextFileReceiver"
        ],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.ConsoleSender"
      }
    }
  }
}

KafkaReceiver:继承自Receiver,接收kafka消息的节点。需指定的参数有:

{  "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092",
  "topics": [
    "topic1",
    "topic2"
  ],
  "groupid": "groupId", 
  "offset": "latest",
  "name": "kafkaReceiver",
  "source": "Kafka Receiver",
  "description": "Receive message from Kafka",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.KafkaReceiver"
}

HttpReceiver: 继承自Receiver,接收 HTTP 的消息节点,目前只支持HTTP的Get方法。

{
  "url": "https://api.wheretheiss.at/v1/satellites/25544",
  "name": "httpReceiver",
  "source": "HTTP Receiver",
  "description": "Get message from web",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
}

JMSReceiver:继承自Receiver,接收JMS标准协议消息的节点,用于接收ActiveMQ、RabbitMQ等消息中间件的消息。

{
  "url": "192.168.1.1",
  "port": 9527,
  "queueName": "data",
  "jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
  "userName": "user",
  "password": "password",
  "name": "jmsReceiver",
  "source": "JMS Receiver",
  "description": "Receive message from JMS(Java Message Service) for ActiveMQ",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.JMSReceiver"
}
metadata

metadata 写在 Receiver 参数中,是接收消息的元数据,用于描述消息的格式定义。需指定以下信息:

reader

接收的消息的内容格式,包括CSV格式(CSVFormatter)、JSON格式(JsonFormatter)或者GeoJSON格式(GeoJsonFormatter)。

CSVFormatter:表示接收的消息的内容格式为CSV格式。需指定:

"reader": {
    "separator": ",",
    "className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
  }

JsonFormatter:接收的消息内容格式为 JSON。示例如下:

"reader": {
    "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
  }

GeoJsonFormatter:接收的消息内容格式为 GeoJSON。示例如下:

"reader": {
    "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
  }

Filter

继承于StreamNode,用于过滤当前数据,进行数据的清洗与整理。

逻辑运算式过滤

String filter——过滤器内容,为一个逻辑运算式,将会保留该运算式结果为 true 的对象。

如果需要获取字段值进行逻辑运算,使用关键词[],如[ID] > 10。多个运算式直接可以使用&&(与)或者 ||(或)进行连接,并使用括号()进行优先顺序调整,如[ID] > 10 && ([X] >= 10 || [Y] <= 65.32)。注意,使用关键词IN、MATCHES、EXISTS、ISNULL与其他运算式一起进行运算时,必须使用括号包括起来,如 ([ID] IN 1,3,5,7,9) && [X] > 100。

表1 filter参数支持的逻辑运算符列表

运算符

描述
== 等于 (==)
该运算符保留属性值等于指定值的对象。例如,[ID] == 3。
注意:double类型慎用,对比精度为10E-10.
!= 不等于 (!=)
该运算符保留属性值不等于指定值的对象。例如,[Name] != “A”。
注意:double类型慎用,对比精度为10E-10.
> 大于 (>)
该运算符保留属性值大于指定值的对象。例如,[Speed] > 50
>= 大于或等于 (>=)
该运算符保留属性值大于或等于指定值的对象。例如,[Speed] >= 50
< 小于 (<)
该运算符保留属性值小于指定值的对象。例如,[X] < 10.231
<= 小于或等于 (<=)
该运算符保留属性值小于或等于指定值的对象。例如,[Y] <= 40
IN IN 在指定列表中
当在逗号分隔的值列表中存在指定字段的值时,该运算符保留对象。例如,[Code] IN HK1,HK3,HK5
MATCHES MATCHES 正则表达式匹配
当指定字段的值与正则表达式相匹配时,该运算符保留对象。例如,[Code] MATCHES “^HK[135]”
注意:需要匹配的正则表达式需要用“”引号包含起来
EXISTS EXISTS 字段是否存在
当已接收的事件方案中存在指定字段时,该运算符保留对象。例如,EXISTS [X]。
ISNULL ISNULL 是否为空
当指定字段包含空值时,该运算符保留对象。例如, [X] ISNULL。

重要说明:filter 指定的逻辑运算式,其中使用的字符串必须使用双引号(“”)或者单引号(‘’)将字符串扩起来,并且双引号(“”)和单引号(‘’)本身要使用\进行转义,例如:字符串“string”在表达式中要写成:\”string\”或者\’string\’。另外,对于 MATCHES 正则表达式匹配,其中用于匹配的字符“\\”需要对每一个“\”进行转义,所以“\\”字符要写成“\\\\”。示例:

 {
        "filter": "([X] > [Y] && [X] > 20) || ([ID] IN 1,2,3,4,5)",
        "name": "Filter",
       "caption": "Attribute Filter",
        "description": "Filter feature by expression",
       "prevNodes": [],
       "nextNodes": [],
        "className": "com.supermap.bdt.streaming.filter.FeatureFilter"
  }

地理过滤

地理过滤是通过空间位置关系过滤地理对象的过滤方式。

示例:

"geoFilter": {
        "connection": {
         "type": "udb",
          "info": [
          {
       "server": "Z:\\airport.udb",,
        "datasetNames": [
        "airports_40"
         ] } ]
          },
        "mode": "inside",
        "name": "geoFilter",
        "caption": "GeoFencing Filter",
        "description": "Filter feature with geofencing",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.filter.GeoFilter"
      }

Mapper

继承于StreamNode,用于建立字段映射以及对字段进行管理,主要包括:字段映射、添加字段、删除字段、字段运算以及地理围栏。

添加字段

"insertMapper": {
        "insertIndex": 1,
        "fieldName": "XX",
        "nType": "DOUBLE",
        "expression": "[X] * 2",
        "name": "insertMapper",
        "source": "Insert Field",
        "description": "Insert Field by X * 2",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureInsertMapper"
      }

删除字段

"deleteMapper": {
        "deleteFieldNames": [
    "F1",
    "F2"
  ],
        "name": "deleteMapper",
        "source": "delete Field",
        "description": "delete Field F1和F2 ,
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper"
      }

字段映射

"mapMaper": {
        "srcToDesNamePair": {
          "ID": "newID_Name",
          "Y": "newY_Name",
          "X": "newX_Name"
        },
        "srcToDesIndexPair": {
          "ID": 0,
          "Y": 2,
          "X": 1
        },
        "name": "mapMaper",
        "source": "Map Fields",
        "description": "Map Fields with new name and index",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureMapMapper"
      }

字段运算

"calculateMapper": {
        "fieldName ": Fcal,
        "expression": "[X] * 2",
        "name": "calculateMapper",
        "source": "calculate Field",
        "description": "calculate Field by X * 2",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper "
      }

对于字段运算,除了支持+、-、x、/等数学运算外,还支持数学函数和字符串处理函数。

函数

说明
ABS(value) 返回参数的绝对值
FLOOR(double) 返回指定双精度值的下限(接近的最小整数)
MAX(value a, value b) 返回 2 个指定参数值中较大的值
MIN(value a, value b) 返回 2 个指定参数值中较小的值
ROUND(value) 返回最接近参数的长整数(假设参数为双精度值)
NOW() 返回当前系统时间
UPPERCASE

字符串处理函数,返回字符串的大写形式,例如:

"\"xyz\".UPPERCASE,表示将字符串“xyz”转换为大写

字母返回,处理结果为“XYZ”

LOWERCASE

字符串处理函数,返回字符串的小写形式,例如:

"\"ABC\".UPPERCASE,表示将字符串“ABC”转换为小

写字母返回,处理结果为“abc”

REPLACE(string1,string2)

字符串处理函数,将原字符串中的 string1 部分替换为

string2,例如:

"\"ABCxyz\".REPLACE(\"AB\",\"MM\"),表示将字符串

中的 “AB”替换为“MM”,处理结果为:MMCxyz

SUBSTRING(location1,

strcount)

字符串处理函数,将原字符串中 location1 指定位置开

始,取出 strcount 个字符返回。例如:

\"ABCxyz\".SUBSTRING(0, 3),表示将目标字符串从第一

个字符开始取出 3 个字符返回,处理结果为:ABC

注意:expression 指定的运算表达式,其中使用的字符串必须使用双引号(“”)或者单引号(‘’)将字符串扩起来,并且双引号(“”)和单引号(‘’)本身要使用\ 进行转义,例如:字符串“string”在表达式中要写成:\”string\”或者\’string\’。

地理围栏

"GeoFenceMapper": {
        "connection": {
          "type": "udb",
          "info": [
            {
              "server": "Z: \\airport.udb",
              "datasetNames": [
                "airports_40"
              ]
            }
          ]
        },
        "fenceName": "NAME",
        "fenceID": "SmID",
        "withinFieldName": "geoWithin",
        "statusFieldName": "geoStatus",
        "name": "GeoFenceMapper",
        "source": "地理标记转换",
        "description": "",
        "prevNodes": [
          "SocketReceiver"
        ],
        "nextNodes": [
          "GeoJsonSocketSender",
          "FenceWithinFilterOut",
          "FenceWithinFilterIn"
        ],
        "className": "com.supermap.bdt.streaming.map.GeoTaggerMapper"
      }

静态资源扩展

"StaticRDDJoinMapper": {
        "className": "com.supermap.bdt.streaming.map.StaticRDDJoinMapper",
        "caption": "StaticRDDJoinMapper",
        "name": "StaticRDDJoinMapper",
        "nextNodes": [],
        "prevNodes": [],
        "description": "StaticRDDJoinMapper",
        "csvFile": "D:\\supermap\\soft\\esriunittype.csv",
        "idFields": ["yard"]
      }

 

Sender

继承于StreamNode,作为流数据处理的出口,向外发送数据。包含:

WebSocketClientSender

WebSocket 发送节点,用于将消息发送到 WebSocket

String path——WebSocket 服务地址。

注:通过 iServer 订阅功能接收数据,仅支持 GeoJsonFormatter。

示例:

"webSocketClientSender": {

"path": "ws://127.0.0.1/data",

"formatter": {

"separator": ",",

"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"

},

"name": "webSocketClientSender",

"caption": "WebSocket Sender",

"description": "Send message by WebSocket",

"prevNodes": [],

"nextNodes": [],

"className":

"com.supermap.bdt.streaming.sender.WebSocketClientSender"

}

EsAppendSender

EsAppendSender 向 Elasticsearch 引擎新增数据的节点。可保存传入的所有数据,可在需要保存 streaming 的历史数据时使用。

String url——ES 服务地址加端口

String queueName——ES 节点名称

String directoryPath——ES 类型名称

示例:

"ESAppendSender": {

"url": "127.0.0.1:9200",

"queueName": "aircondition",

"directoryPath": "test1",

"formatter": {

"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"

},

"name": "ESAppendSender",

"caption": "ES 发送器",

"description": "",

"prevNodes": [],

"nextNodes": [],

"className": "com.supermap.bdt.streaming.sender.EsAppendSender"

}

EsUpdateSender

向 Elasticsearch 引擎新增与更新数据的节点。需要设置发送消息的 ID 字段,如果发送的消息内容中 ID 字段的值已经在 Elasticsearch 引擎中存在,则将更新该记录;如果 Elasticsearch 引擎中没有对应的 ID 值,则会新增一条记录。

String url——ES 服务地址

String port——ES 服务端口

String index——ES 节点名称

String typ——ES 类型名称

String idFieldName——唯一标识字段名称,用于查找需要更新的记录

示例:

"ESUpdateSender":

"url":"127.0.0.1",

" port":"9200",

" index":" aircondition ",

"typ":" test1",

"idFieldName":"id",

"name":"ESUpdateSender",

"caption":"ESUpdateSender",

"description":"Send message to Elasticsearch",

"prevNodes":[],

"nextNodes":[],

"className":"com.supermap.bdt.streaming.sender.ESUpdateSender"

}

FileSender

文件型发送节点,用于将消息保存到指定文件中。

String filePath —— 输出文件路径

示例:

"fileSender": {

"filePath": "C:\\result.csv",

"formatter": {

"separator": ",",

"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"

},

"name": "fileSender",

"caption": "",

"description": "",

"prevNodes": [

"filter"

],

"nextNodes": [],

"className": "com.supermap.bdt.streaming.sender.FileSender"

}

JMSSender

发送 JMS 标准协议消息的节点,用于将消息发送到 ActiveMQ、RabbitMQ 等消息中间件。

String url——JMS 消息服务地址

Int port——消息服务端口

String queueName ——消息队列名称

String jdniName——对应消息中间件的 JDNI 名称,需要到中间件官网查询

String username——用户名

String password——密码

示例:

"JMSSender": {

"url": "192.168.168.33",

"port": 61616,

"queueName": "myTestJDNI",

"jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",

"userName": "admin",

"password": "admin",

"formatter": {

"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"

},

"name": "JMSSender",

"caption": "",

"description": "",

"prevNodes": [

"TextFileReceiver"

],

"nextNodes": [],

"className": "com.supermap.bdt.streaming.sender.JMSSender"

}

SMSSender

短信消息发送节点,SMSSender 使用中国网建(http://www.webchinese.com.cn/)提供的 API 接口发送短信消息,注册中国网建用户后获得用户名和接口安全秘钥,即可发送短信消息。

注意:发送的内容审核时间大概 30 分钟以内,所以接收消息会有延迟。

String user——webchinese 用户名

String apiKey——webchinese 接口安全秘钥

java.util.List[String] phoneNumbers——发送的手机号码列表

int sendLimit——发送数量限制,避免消息频繁消耗短信次数,可以设置本次运行最多发送的条数。

示例:

"smsSender": {

"user": "user",

"apiKey": "apiKey",

"phoneNumbers": [

"13800000000"

],

"sendLimit": 100,

"formatter": {

"separator": ",",

"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"

},

"name": "smsSender",

"caption": "SMS Sender",

"description": "Send message by SMS",

"prevNodes": [],

"nextNodes": [],

"className": "com.supermap.bdt.streaming.sender.SMSSender"

}

SocketClientSender

Socket 客户端发送节点,通过 Socket 客户端连接将消息发送到 Socket 服务端。

String ip——接收的 Socket 服务的 IP 地址;

Int port——接收的 Socket 服务的端口号。

示例:

"socketClientSender": {

"ip": "127.0.0.1",

"port": 9527,

"formatter": {

"separator": ",",

"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"

},

"name": "socketClientSender",

"caption": "SocketClient Sender",

"description": "Send message by Socket",

"prevNodes": [],

"nextNodes": [],

"className": "com.supermap.bdt.streaming.sender.SocketClientSender"

}

SocketServerSender

Socket 服务端发送节点,启动 Socket 服务端,将消息发送到连接的 Socket 客户端。

Int port——启动 Socket 服务的端口号。

示例:

"socketServerSender": {

"port": 9527,

"formatter": {

"separator": ",",

"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"

},

"name": "socketServerSender",

"caption": "SocketServer Sender",

"description": "Send message by Socket",

"prevNodes": [],

"nextNodes": [],

"className": "com.supermap.bdt.streaming.sender.SocketServerSender"

}

参照上述示例设置,可以将数据的分析处理结果输出到 iServer DataStore 创建的时空数据库中。

编写流处理模型文件

根据上述参数说明,编写一个完整的流处理模型文件,您可以将文件另存为后缀为.streaming的文件,用于快速发布流数据服务,也可以将流处理模型文件的内容直接写入“配置信息”中进行发布。示例如下:

{
  "version": 9000,
  "sparkParameter": {
    "checkPointDir": "tmp",
    "interval": 10000
  },
  "stream": {
    "nodeDic": {
      "AQIReceiver": {
        "url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111",
        "reader": {
          "isJsonArray": true,
          "arrayExpression": "airQualityList",
          "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
        },
        "metadata": {
          "title": "",
          "epsg": 3857,
          "fieldInfos": [
            {
              "name": "X",
              "source": "location.x",
              "nType": "DOUBLE"
            },
            {
              "name": "Y",
              "source": "location.y",
              "nType": "DOUBLE"
            },
            {
              "name": "positionName",
              "source": "positionName",
              "nType": "TEXT"
            },
            {
              "name": "aqi",
              "source": "aqi",
              "nType": "DOUBLE"
            }
          ],
          "featureType": "POINT"
        },
        "name": "AQIReceiver",
        "caption": "",
        "description": "",
        "prevNodes": [],
        "nextNodes": [
          "WebSocketClientSender"
        ],
        "className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
      },
      "WebSocketClientSender": {
        "path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast",
        "formatter": {
          "separator": ",",
          "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
        },
        "name": "WebSocketClientSender",
        "caption": "",
        "description": "",
        "prevNodes": [
          "AQIReceiver"
        ],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender"
      }
    }
  }
}