spark双流join
本帖最后由 168主编 于 2021-2-8 21:30 编辑flink流的join原理不同的是,Spark双流join是对俩个流做满外连接 ,因为网络延迟等关系,不能保证每个窗口中的数据key都能匹配上,这样势必会出现三种情况:(some,some),(None,some),(Some,None),根据这三种情况,下面做一下详细解析:
(some,some)—— 1号流和2号流中key能正常进行逻辑运算,但是考虑到2号流后续可能会有剩下的数据到来,所以需要将1号流中的key保存到redis,以等待接下来的数据(None,Some)—— 找不到1号流中对应key的数据,需要去redis中查找1号流的缓存,如果找不到,则缓存起来,等待1号流(Some,None)—— 找不到2号流中的数据,需要将key保存到redis,以等待接下来的数据,并且去reids中找2号流的缓存,如果有,则join,然后删除2号流的缓存
代码示例 def fullJoin(orderInfoStream: DStream, orderDetailStream: DStream) = {
val orderIdAndOrderInfo: DStream[(String, OrderInfo)] =
orderInfoStream.map(info => (info.id, info))
val orderIdAndOrderDetail: DStream[(String, OrderDetail)] =
orderDetailStream.map(info => (info.order_id, info))
orderIdAndOrderInfo
.fullOuterJoin(orderIdAndOrderDetail)
.mapPartitions((it: Iterator[(String, (Option, Option))]) => {
// 获取redis客户端
val client: Jedis = RedisUtil.getClient
// 读写操作
val result: Iterator = it.flatMap {
// order_info有数据, order_detail有数据
case (orderId, (Some(orderInfo), Some(orderDetail))) =>
println("Some(orderInfo) Some(orderDetail)")
// 1. 把order_info信息写入到缓存(因为order_detail信息有部分信息可能迟到)
cacheOrderInfo(orderInfo, client)
// 2. 把信息join到一起(其实就是放入一个样例类中)(缺少用户信息, 后面再专门补充)
val saleDetail = SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
// 3. 去order_detail的缓存找数据, 进行join
// 3.1 先获取这个order_id对应的所有的order_detail的key
import scala.collection.JavaConversions._
val keys: List = client.keys("order_detail:" + orderInfo.id + ":*").toList // 转成scala集合
val saleDetails: List = keys.map(key => {
val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf)
// 删除对应的key, 如果不删, 有可能造成数据重复
client.del(key)
SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
})
saleDetail :: saleDetails
case (orderId, (Some(orderInfo), None)) =>
println("Some(orderInfo), None")
// 1. 把order_info信息写入到缓存(因为order_detail信息有部分信息可能迟到)
cacheOrderInfo(orderInfo, client)
// 3. 去order_detail的缓存找数据, 进行join
// 3.1 先获取这个order_id对应的所有的order_detail的key
import scala.collection.JavaConversions._
val keys: List = client.keys("order_detail:" + orderInfo.id + ":*").toList // 转成scala集合
val saleDetails: List = keys.map(key => {
val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf)
// 删除对应的key, 如果不删, 有可能造成数据重复
client.del(key)
SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
})
saleDetails
case (orderId, (None, Some(orderDetail))) =>
println("None, Some(orderDetail)")
// 1. 去order_info的缓存中查找
val orderInfoJson = client.get("order_info:" + orderDetail.order_id)
if (orderInfoJson == null) {
// 3. 如果不存在, 则order_detail缓存
cacheOrderDetail(orderDetail, client)
Nil
} else {
// 2. 如果存在, 则join
val orderInfo = JSON.parseObject(orderInfoJson, classOf)
SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
}
}
// 关闭redis客户端
client.close()
result
})
}
本文作者:Sheep Sun
本文链接:https://www.cnblogs.com/yangxusun9/p/13137592.html
页:
[1]