jvm运动数据区总结(JVM上高性能数据格式库包Apache Arrow入门和架构详解Gkatziouras)
jvm运动数据区总结
JVM上高性能数据格式库包Apache Arrow入门和架构详解GkatziourasApache Arrow是是各种大数据工具(包括BigQuery)使用的一种流行格式,它是平面和分层数据的存储格式。它是一种加快应用程序内存密集型。
数据处理和数据科学领域中的常用库: Apache Arrow 。诸如Apache Parquet,Apache Spark,pandas之类的开放源代码项目以及许多商业或封闭源代码服务都使用Arrow。它提供以下功能:
- 内存计算
- 标准化的柱状存储格式
- 一个IPC和RPC框架,分别用于进程和节点之间的数据交换
让我们看一看在Arrow出现之前事物是如何工作的:
我们可以看到,为了使Spark从Parquet文件中读取数据,我们需要以Parquet格式读取和反序列化数据。这要求我们通过将数据加载到内存中来制作数据的完整副本。首先,我们将数据读入内存缓冲区,然后使用Parquet的转换方法将数据(例如字符串或数字)转换为我们的编程语言的表示形式。这是必需的,因为Parquet表示的数字与Python编程语言表示的数字不同。
由于许多原因,这对于性能来说是一个很大的问题:
- 我们正在复制数据并在其上运行转换步骤。数据的格式不同,我们需要对所有数据进行读取和转换,然后再对数据进行任何计算。
- 我们正在加载的数据必须放入内存中。您只有8GB的RAM,数据是10GB吗?你真倒霉!
现在,让我们看一下Apache Arrow如何改进这一点:
Arrow无需复制和转换数据,而是了解如何直接读取和操作数据。为此,Arrow社区定义了一种新的文件格式以及直接对序列化数据起作用的操作。可以直接从磁盘读取此数据格式,而无需将其加载到内存中并转换/反序列化数据。当然,部分数据仍将被加载到RAM中,但您的数据不必放入内存中。Arrow使用其文件的内存映射功能,仅在必要和可能的情况下将尽可能多的数据加载到内存中。
Apache Arrow支持以下语言:
- C++
- C#
- Go
- Java
- JavaScript
- Rust
- Python (through the C++ library)
- Ruby (through the C++ library)
- R (through the C++ library)
- MATLAB (through the C++ library).
Arrow首先是提供用于内存计算的列式数据结构的库,可以将任何数据解压缩并解码为Arrow柱状数据结构,以便随后可以对解码后的数据进行内存内分析。Arrow列格式具有一些不错的属性:随机访问为O(1),每个值单元格在内存中的前一个和后一个相邻,因此进行迭代非常有效。
Apache Arrow定义了一种二进制“序列化”协议,用于安排Arrow列数组的集合(称为“记录批处理”),该数组可用于消息传递和进程间通信。您可以将协议放在任何地方,包括磁盘上,以后可以对其进行内存映射或读入内存并发送到其他地方。
Arrow协议的设计目的是使您可以“映射”一个Arrow数据块而不进行任何反序列化,因此对磁盘上的Arrow协议数据执行分析可以使用内存映射并有效地支付零成本。该协议用于很多事情,例如Spark SQL和Python之间的流数据,用于针对Spark SQL数据块运行pandas函数,这些被称为“ pandas udfs”。
Arrow是为内存而设计的(但是您可以将其放在磁盘上,然后再进行内存映射)。它们旨在相互兼容,并在应用程序中一起使用,而其竞争对手Apache Parquet文件是为磁盘存储而设计的。
优点:Apache Arrow为平面和分层数据定义了一种独立于语言的列式存储格式,该格式组织为在CPU和GPU等现代硬件上进行高效的分析操作而组织。Arrow存储器格式还支持零拷贝读取,以实现闪电般的数据访问,而无需序列化开销。
Java的Apache Arrow导入库:
<dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version> </dependency>
在开始之前,必须了解对于Arrow的读/写操作,使用了字节缓冲区。诸如读取和写入之类的操作是字节的连续交换。为了提高效率,Arrow附带了一个缓冲区分配器,该缓冲区分配器可以具有一定的大小,也可以具有自动扩展功能。支持分配管理的库是arrow-memory-netty和arrow-memory-unsafe。我们这里使用netty。
用Arrow存储数据需要一个模式,模式可以通过编程定义:
package com.gkatzioura.arrow; import java.io.IOException; import java.util.List; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; public class SchemaFactory { public static Schema DEFAULT_SCHEMA = createDefault(); public static Schema createDefault() { var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null); var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null); return new Schema(List.of(strField, intField)); } public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField)); } public static Schema fromJson(String jsonString) { try { return Schema.fromJSON(jsonString); } catch (IOException e) { throw new ArrowExampleException(e); } } }
他们也有一个可解析的json表示形式:
{ "fields" : [ { "name" : "col1", "nullable" : true, "type" : { "name" : "utf8" }, "children" : [ ] }, { "name" : "col2", "nullable" : true, "type" : { "name" : "int", "bitWidth" : 32, "isSigned" : true }, "children" : [ ] } ] }
另外,就像Avro一样,您可以在字段上设计复杂的架构和嵌入式值:
public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField)); }
基于上面的的Schema,我们将为我们的类创建一个DTO:
package com.gkatzioura.arrow; import lombok.Builder; import lombok.Data; @Data @Builder public class DefaultArrowEntry { private String col1; private Integer col2; }
我们的目标是将这些Java对象转换为Arrow字节流。
1. 使用分配器创建 DirectByteBuffer
这些缓冲区是 堆外的 。您确实需要释放所使用的内存,但是对于库用户而言,这是通过在分配器上执行 close() 操作来完成的。在我们的例子中,我们的类将实现 Closeable 接口,该接口将执行分配器关闭操作。
通过使用流api,数据将被流传输到使用Arrow格式提交的OutPutStream:
package com.gkatzioura.arrow; import java.io.Closeable; import java.io.IOException; import java.nio.channels.WritableByteChannel; import java.util.List; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.util.Text; import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA; public class DefaultEntriesWriter implements Closeable { private final RootAllocator rootAllocator; private final VectorSchemaRoot vectorSchemaRoot;//向量分配器创建: public DefaultEntriesWriter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); } public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) { if (batchSize <= 0) { batchSize = defaultArrowEntries.size(); } DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider(); try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) { writer.start(); VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset(); boolean exactBatches = defaultArrowEntries.size()%batchSize == 0; int batchCounter = 0; for(int i=0; i < defaultArrowEntries.size(); i++) { childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2()); batchCounter++; if(batchCounter == batchSize) { vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch(); batchCounter = 0; } } if(!exactBatches) { vectorSchemaRoot.setRowCount(batchCounter); writer.writeBatch(); } writer.end(); } catch (IOException e) { throw new ArrowExampleException(e); } } @Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); } }
为了在Arrow上显示批处理的支持,已在函数中实现了简单的批处理算法。对于我们的示例,只需考虑将数据分批写入。
让我们深入了解上面代码功能:
向量分配器创建:
public DefaultEntriesToBytesConverter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); }
然后在写入流时,实现并启动了Arrow流编写器
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out)); writer.start();
我们将数据填充向量,然后还重置它们,但让预分配的缓冲区 存在 :
VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset();
写入数据时,我们使用 setSafe 操作。如果需要分配更多的缓冲区,应采用这种方式。对于此示例,此操作在每次写入时都完成,但是在考虑了所需的操作和缓冲区大小后可以避免:
childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());
然后,将批处理写入流中:
vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch();
最后但并非最不重要的一点是,我们关闭了writer:
@Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); }
以上就是JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)的详细内容,更多关于Apache Arrow入门的资料请关注开心学习网其它相关文章!
- xampp安装后启动apache弹出对话框(XAMPP下使用顶级域名绑定虚拟主机的配置方法和示例)
- apache服务配置详解(APACHE 多站点配置方法)
- apache持续连接时间设置(Apache增加最大连接数的方法)
- apache漏洞怎么排查(apache urlrewrite防盗链功能配置)
- phpmysql完全学习手册教程(Windows下搭建PHP开发环境Apache+PHP+MySQL)
- nginx反向代理及原理(传说中的反向代理,Nginx+Apache软件配置Web服务器)
- apachephp安装配置教程交流(Apache中利用mod_rewrite实现防盗链)
- apache的优化建议(Apache 网站速度更快)
- apache搭建ftp服务器(使用Apache&花生壳架设Web服务器)
- apache和yii域名配置(Yii 框架控制器创建使用及控制器响应操作示例)
- apache服务部署tomcat(Apache与Tomcat服务器整合的基本配置方法及概要说明)
- apache协议内容(Apache中rewrite伪静态规则介绍)
- 修改mysql安装服务名称(Apache为mysql以及自己的项目设置虚拟路径)
- 自己搭建域名解析服务器(apache 二级域名解析实现方法)
- apache虚拟目录配置(Apache 添加虚拟目录注意事项)
- apache访问提示404(Apache跨域资源访问报错问题解决方案)
- Google 推出了一个游戏生成器,让不会编程的你也能自己设计游戏(推出了一个游戏生成器)
- 二胎家庭老大爱闹情绪,用这招很有效(二胎家庭老大爱闹情绪)
- 一个30岁男人外遇失败的全过程(一个30岁男人外遇失败的全过程)
- 《无敌破坏王2》 不聊彩蛋,聊聊我从动画里看到的现实那些事儿(无敌破坏王2不聊彩蛋)
- 《寄生虫》 三观不正 人类悲欢从来不相通,感同身受也并非本能(寄生虫三观不正)
- 这部动漫中的女孩子,可比101女孩更加励志(这部动漫中的女孩子)
热门推荐
- Ext.query与Ext.select 的用法
- mysql重新安装失败
- html5+css样式代码(详解HTML5中CSS外观属性)
- html5封闭ios(Html5 页面适配iPhoneX就是那么简单)
- wordpress如何在文章中自定义html(wordpress添加Html5的表单验证required方法小结)
- python生成文本文件(python+os根据文件名自动生成文本)
- thinkphp5配置入口路径(ThinkPHP5.1框架数据库链接和增删改查操作示例)
- sql server事务回滚(SQL Server 添加Delete操作回滚日志方式)
- mysql索引原理及使用(再有人问你MySQL索引原理,就把这篇文章甩给他!)
- mysql主从同步工作原理(MySQL是如何实现主备同步)
排行榜
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9