stream源码分析(java stream源码)
本文目录一览:
- 1、linux源码分析
- 2、如何生成 streamgraph
- 3、大家对spark的源码了解多少,sparkshuffle,调度,sparkstreaming的源码?
- 4、如何看javawebsocket源码
- 5、java源代码分析 实在是不太会,求高手教教我。
- 6、pipedOutputStream源码flush方法求解
linux源码分析
linux的tcp-ip栈代码的详细分析
1.数据结构(msghdr,sk_buff,socket,sock,proto_ops,proto)
bsd套接字层,操作的对象是socket,数据存放在msghdr这样的数据结构:
创建socket需要传递family,type,protocol三个参数,创建socket其实就是创建一个socket实例,然后创建一个文件描述符结构,并且互相建立一些关联,即建立互相连接的指针,并且初始化这些对文件的写读操作映射到socket的read,write函数上来。
同时初始化socket的操作函数(proto_ops结构),如果传入的type参数是STREAM类型,那么就初始化为SOCKET-ops为inet_stream_ops,如果是DGRAM类型,则SOCKET-ops为inet_dgram_ops。对于inet_stream_ops其实是一个结构体,包含了stream类型的socket操作的一些入口函数,在这些函数里主要做的是对socket进行相关的操作,同时通过调用下面提到的sock中的相关操作完成socket到sock层的传递。比如在inet_stream_ops里有个inet_release的操作,这个操作除了释放socket的类型空间操作外,还通过调用socket连接的sock的close操作,对于stream类型来说,即tcp_close来关闭sock
释放sock。
创建socket同时还创建sock数据空间,初始化sock,初始化过程主要做的事情是初始化三个队列,receive_queue(接收到的数据包sk_buff链表队列),send_queue(需要发送数据包的sk_buff链表队列),backlog_queue(主要用于tcp中三次握手成功的那些数据包,自己猜的),根据family、type参数,初始化sock的操作,比如对于family为inet类型的,type为stream类型的,sock-proto初始化为tcp_prot.其中包括stream类型的协议sock操作对应的入口函数。
在一端对socket进行write的过程中,首先会把要write的字符串缓冲区整理成msghdr的数据结构形式(参见linux内核2.4版源代码分析大全),然后调用sock_sendmsg把msghdr的数据传送至inet层,对于msghdr结构中数据区中的每个数据包,创建sk_buff结构,填充数据,挂至发送队列。一层层往下层协议传递。一下每层协议不再对数据进行拷贝。而是对sk_buff结构进行操作。
如何生成 streamgraph
生成 StreamGraph 的源码分析
我们通过在DataStream上做了一系列的转换(map、filter等)得到了StreamTransformation集合,然后通过 StreamGraphGenerator.generate 获得StreamGraph,该方法的源码如下:
// 构造 StreamGraph 入口函数
public static StreamGraph generate(StreamExecutionEnvironment env, ListStreamTransformation? transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
// 自底向上(sink-source)对转换树的每个transformation进行转换。
private StreamGraph generateInternal(ListStreamTransformation? transformations) {
for (StreamTransformation? transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
// 对具体的一个transformation进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
// 返回值为该transform的id集合,通常大小为1个(除FeedbackTransformation)
private CollectionInteger transform(StreamTransformation? transform) {
// 跳过已经转换过的transformation
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
// 为了触发 MissingTypeInfo 的异常
transform.getOutputType();
CollectionInteger transformedIds;
if (transform instanceof OneInputTransformation?, ?) {
transformedIds = transformOnInputTransform((OneInputTransformation?, ?) transform);
} else if (transform instanceof TwoInputTransformation?, ?, ?) {
transformedIds = transformTwoInputTransform((TwoInputTransformation?, ?, ?) transform);
} else if (transform instanceof SourceTransformation?) {
transformedIds = transformSource((SourceTransformation?) transform);
} else if (transform instanceof SinkTransformation?) {
transformedIds = transformSink((SinkTransformation?) transform);
} else if (transform instanceof UnionTransformation?) {
transformedIds = transformUnion((UnionTransformation?) transform);
} else if (transform instanceof SplitTransformation?) {
transformedIds = transformSplit((SplitTransformation?) transform);
} else if (transform instanceof SelectTransformation?) {
transformedIds = transformSelect((SelectTransformation?) transform);
} else if (transform instanceof FeedbackTransformation?) {
transformedIds = transformFeedback((FeedbackTransformation?) transform);
} else if (transform instanceof CoFeedbackTransformation?) {
transformedIds = transformCoFeedback((CoFeedbackTransformation?) transform);
} else if (transform instanceof PartitionTransformation?) {
transformedIds = transformPartition((PartitionTransformation?) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationId(transform.getId(), transform.getUid());
}
return transformedIds;
}
最终都会调用 transformXXX 来对具体的StreamTransformation进行转换。我们可以看下 transformOnInputTransform(transform) 的实现:
private IN, OUT CollectionInteger transformOnInputTransform(OneInputTransformationIN, OUT transform) {
// 递归对该transform的直接上游transform进行转换,获得直接上游id集合
CollectionInteger inputIds = transform(transform.getInput());
// 递归调用可能已经处理过该transform了
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 添加 StreamNode
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer? keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
// 添加 StreamEdge
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
该函数首先会对该transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过transform构造出StreamNode,最后与上游的transform进行连接,构造出StreamNode。
最后再来看下对逻辑转换(partition、union等)的处理,如下是 transformPartition 函数的源码:
private T CollectionInteger transformPartition(PartitionTransformationT partition) {
StreamTransformationT input = partition.getInput();
ListInteger resultIds = new ArrayList();
// 直接上游的id
CollectionInteger transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
// 生成一个新的虚拟id
int virtualId = StreamTransformation.getNewNodeId();
// 添加一个虚拟分区节点,不会生成 StreamNode
streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
resultIds.add(virtualId);
}
return resultIds;
}
对partition的转换没有生成具体的StreamNode和StreamEdge,而是添加一个虚节点。当partition的下游transform(如map)添加edge时(调用 StreamGraph.addEdge ),会把partition信息写入到edge中。如 StreamGraph.addEdgeInternal 所示:
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayListString());
}
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner? partitioner,
ListString outputNames) {
// 当上游是select时,递归调用,并传入select信息
if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
// select上游的节点id
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
}
// 当上游是partition时,递归调用,并传入partitioner信息
else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
// partition上游的节点id
upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtuaPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
} else {
// 真正构建StreamEdge
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 未指定partitioner的话,会为其选择 forward 或 rebalance 分区。
if (partitioner == null upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitionerObject();
} else if (partitioner == null) {
partitioner = new RebalancePartitionerObject();
}
// 健康检查, forward 分区必须要上下游的并发度一致
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
// 创建 StreamEdge
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
// 将该 StreamEdge 添加到上游的输出,下游的输入
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
大家对spark的源码了解多少,sparkshuffle,调度,sparkstreaming的源码?
流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流;既然是数据流处理,就会想到数据的流入、数据的加工、数据的流出。
日常工作、生活中数据来源很多不同的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生很多源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交通监控等;通信时代的手机、平板、智能设备、物联网等会产生很多实时数据,数据流无处不在。
在大数据时代Spark Streaming能做什么?
平时用户都有网上购物的经历,用户在网站上进行的各种操作通过Spark Streaming流处理技术可以被监控,用户的购买爱好、关注度、交易等可以进行行为分析。在金融领域,通过Spark Streaming流处理技术可以对交易量很大的账号进行监控,防止罪犯洗钱、财产转移、防欺诈等。在网络安全性方面,黑客攻击时有发生,通过Spark Streaming流处理技术可以将某类可疑IP进行监控并结合机器学习训练模型匹配出当前请求是否属于黑客攻击。其他方面,如:垃圾邮件监控过滤、交通监控、网络监控、工业设备监控的背后都是Spark Streaming发挥强大流处理的地方。
大数据时代,数据价值一般怎么定义?
所有没经过流处理的数据都是无效数据或没有价值的数据;数据产生之后立即处理产生的价值是最大的,数据放置越久或越滞后其使用价值越低。以前绝大多数电商网站盈利走的是网络流量(即用户的访问量),如今,电商网站不仅仅需要关注流量、交易量,更重要的是要通过数据流技术让电商网站的各种数据流动起来,通过实时流动的数据及时分析、挖掘出各种有价值的数据;比如:对不同交易量的用户指定用户画像,从而提供不同服务质量;准对用户访问电商网站板块爱好及时推荐相关的信息。
SparkStreaming VS Hadoop MR:
Spark Streaming是一个准实时流处理框架,而Hadoop MR是一个离线、批处理框架;很显然,在数据的价值性角度,Spark Streaming完胜于Hadoop MR。
SparkStreaming VS Storm:
Spark Streaming是一个准实时流处理框架,处理响应时间一般以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。所以在流框架选型方面要看具体业务场景。需要澄清的是现在很多人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持不好等等,那是因为很多人不会驾驭Spark Streaming及Spark本身。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推进到100毫秒之内甚至更少。
SparkStreaming优点:
1、提供了丰富的API,企业中能快速实现各种复杂的业务逻辑。
2、流入Spark Streaming的数据流通过和机器学习算法结合,完成机器模拟和图计算。
3、Spark Streaming基于Spark优秀的血统。
SparkStreaming能不能像Storm一样,一条一条处理数据?
Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm一样呢?答案是:可以的。
业界一般的做法是Spark Streaming和Kafka搭档即可达到这种效果,入下图:
Kafka业界认同最主流的分布式消息框架,此框架即符合消息广播模式又符合消息队列模式。
Kafka内部使用的技术:
1、 Cache
2、 Interface
3、 Persistence(默认最大持久化一周)
4、 Zero-Copy技术让Kafka每秒吞吐量几百兆,而且数据只需要加载一次到内核提供其他应用程序使用
外部各种源数据推进(Push)Kafka,然后再通过Spark Streaming抓取(Pull)数据,抓取的数据量可以根据自己的实际情况确定每一秒中要处理多少数据。
通过Spark Streaming动手实战wordCount实例
这里是运行一个Spark Streaming的程序:统计这个时间段内流进来的单词出现的次数. 它计算的是:他规定的时间段内每个单词出现了多少次。
1、先启动下Spark集群:
我们从集群里面打开下官方网站
接受这个数据进行加工,就是流处理的过程,刚才那个WordCount就是以1s做一个单位。
刚才运行的时候,为什么没有结果呢?因为需要数据源。
2、获取数据源:
新开一个命令终端,然后输入:
$ nc -lk 9999
现在我们拷贝数据源进入运行:
然后按回车运行
DStream和RDD关系:
没有输入数据会打印的是空结果:
但是实际上,Job的执行是Spark Streaming框架帮我们产生的和开发者自己写的Spark代码业务逻辑没有关系,而且Spark Streaming框架的执行时间间隔可以手动配置,如:每隔一秒钟就会产生一次Job的调用。所以在开发者编写好的Spark代码时(如:flatmap、map、collect),不会导致job的运行,job运行是Spark Streaming框架产生的,可以配置成每隔一秒中都会产生一次job调用。
Spark Streaming流进来的数据是DStream,但Spark Core框架只认RDD,这就产生矛盾了?
Spark Streaming框架中,作业实例的产生都是基于rdd实例来产生,你写的代码是作业的模板,即rdd是作业的模板,模板一运行rdd就会被执行,此时action必须处理数据。RDD的模板就是DStream离散流,RDD之间存在依赖关系,DStream就有了依赖关系,也就构成了DStream 有向无环图。这个DAG图,是模板。Spark Streaming只不过是在附在RDD上面一层薄薄的封装而已。你写的代码不能产生Job,只有框架才能产生Job.
如果一秒内计算不完数据,就只能调优了.
总结:
使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。
如何看javawebsocket源码
import java.io.IOException;
import java.io.InputStream;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint("/log")
public class LogWebSocketHandle {
private Process process;
private InputStream inputStream;
/**
* 新stream源码分析的WebSocket请求开启
*/
@OnOpen
public void onOpen(Session session) {
try {
// 执行tail -f命令
process = Runtime.getRuntime().exec("tail -f /var/log/syslog");
inputStream = process.getInputStream();
// 一定要启动新stream源码分析的线程stream源码分析,防止InputStream阻塞处理WebSocketstream源码分析的线程
TailLogThread thread = new TailLogThread(inputStream, session);
thread.start();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* WebSocket请求关闭
*/
@OnClose
public void onClose() {
try {
if(inputStream != null)
inputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
if(process != null)
process.destroy();
}
@OnError
public void onError(Throwable thr) {
thr.printStackTrace();
}
}
java源代码分析 实在是不太会,求高手教教我。
package test2;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class JavaCodeAnalyzer {
public static void analyze(File file) throws IOException{
//FileOutputStream fos = new FileOutputStream("F;"+File.separator+"result.txt");
if(!(file.getName().endsWith(".txt")||file.getName().endsWith(".java"))){
System.out.println("输入的分析文件格式不对!");
}
InputStream is= new FileInputStream(file);
BufferedReader br= new BufferedReader(new InputStreamReader(is));
String temp;
int count=0;
int countSpace=0;
int countCode=0;
int countDesc=0;
MapString, Integer map = getKeyWords();
while((temp=br.readLine())!=null){
countKeys(temp, map);
count++;
if(temp.trim().equals("")){
countSpace++;
}else if(temp.trim().startsWith("/*")||temp.trim().startsWith("//")){
countDesc++;
}else{
countCode++;
}
}
System.out.printf("代码行数:"+countCode+"占总行数的%4.2f\n",(double)countCode/count);
System.out.printf("空行数:"+countSpace+"占总行数的%4.2f\n",(double)countSpace/count);
System.out.printf("注释行数:"+countDesc+"占总行数的%4.2f\n",(double)countDesc/count);
System.out.println("总行数:"+count);
System.out.println("出现最多的5个关键字是:");
System.out.println("");
System.out.println("");
System.out.println("");
System.out.println("");
System.out.println("");
}
public static void main(String[] args) {
getKeyWords();
File file = new File("F://Test.java");
try {
analyze(file);
} catch (IOException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}
public static MapString,Integer getKeyWords(){
MapString,Integer map = new HashMapString, Integer();
String[]keywords = {"abstract","assert","boolean","break","byte","case","catch","char","class","continue","default","do","double","else","enum","extends","final","finally","float","for","if","implements","import","instanceof","int","interface","long","native","new","package","private","protected","public","return"," strictfp","short","static","super"," switch","synchronized","this","throw","throws","transient","try","void","volatile","while","goto","const"};
for(String s:keywords){
map.put(s, 0);
}
return map;
}
public static void countKeys(String s,MapString,Integer map){
SetString keys = map.keySet();
for(String ss:keys){
if(s.indexOf(ss)!=-1){
map.put(ss, map.get(ss)+1);
}
}
}
}
上班没啥时间了,还有点没写完,你在想想。
pipedOutputStream源码flush方法求解
NotifyAll唤醒stream源码分析的是线程池中所有线程stream源码分析,在这里应该是想唤醒pipedInputStream中stream源码分析的线程吧