企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 使用 Kudu 开发应用程序 原文链接 : [http://kudu.apache.org/docs/developing.html](http://kudu.apache.org/docs/developing.html) 译文链接 : [http://cwiki.apachecn.org/pages/viewpage.action?pageId=10813629](http://cwiki.apachecn.org/pages/viewpage.action?pageId=10813629) 贡献者 : [小瑶](/display/~chenyao) [ApacheCN](/display/~apachecn) [Apache中文网](/display/~apachechina) ## 使用 Kudu 开发应用程序 **Kudu **提供 **C ++**,**Java** 和 **Python** 客户端 **API** 以及参考示例来说明它们的用途。 注意 不支持使用服务器端或专用接口,不属于公共 **API** 的接口没有稳定性保证。 ## 查看 API 文档 ### C++ API 文档  您可以在线查看 **C ++** 客户端 **API** 文档。或者,在从源代码构建 **Kudu** 后,您还可以另外构建 **doxygen** 目标(例如,如果使用 **make** ,则运行 **make doxygen** ),并通过在您最喜欢的 **Web** 中打开 **docs/doxygen/client_api/html/index.html** 文件来使用本地生成的 **API** 文档浏览器。 重要 为了构建 **doxygen** 目标,有必要在您的构建机器上安装带有 **Dot( graphviz )**支持的 **doxygen** 。如果您从源代码构建 **Kudu** 后安装了 **doxygen** ,则需要再次运行 **cmake** 以获取 **doxygen** 位置并生成适当的目标。 ### Java API 文档  您可以在线查看[ **Java API** 文档](http://kudu.apache.org/apidocs/index.html) 。或者,在构建 [**Java** 客户端](/pages/viewpage.action?pageId=10813629)之后,**Java API** 文档可以在 **java/kudu-client/target/apidocs/index.html** 中找到。 ## 工作实例 [**kudu** 示例](https://github.com/cloudera/kudu-examples) **Github** 仓库中提供了几个示例应用程序。每个示例包括一个自述文件,显示如何编译和运行它。这些例子说明了 **Kudu** **API** 的正确使用,以及如何设置虚拟机来运行 **Kudu** 。以下列表包括今天可用的一些示例。检查存储库本身,以防此列表过期。 **java/java-example** 连接到 **Kudu** 实例的简单 **Java** 应用程序创建一个表,向其中写入数据,然后丢弃该表。 **java/collectl** 侦听 **TCP** 套接字的小型 **Java** 应用程序,用于与 **Collectl** 有线协议相对应的时间序列数据。常用的 **collectl** 工具可用于将示例数据发送到服务器。 **java/insert-loadgen** 生成随机插入负载的 **Java** 应用程序。 **python/dstat-kudu** 一个示例程序,显示如何使用 **Kudu Python API** 将数据加载到由外部程序生成的新的/现有的 **Kudu** 表中, **dstat** 在这种情况下。 **python/graphite-kudu** 使用 **graphite-web** ( 石墨网 ) 与 **Kudu** 作为后端的实验插件。 **demo-vm-setup** 脚本下载并运行带有 **Kudu** 的 **VirtualBox** 虚拟机已安装。有关详细信息,请参阅 [快速入门](/pages/viewpage.action?pageId=10813610) 。 这些例子应该是您自己的 **Kudu** 应用程序和集成的有用起点。 ### Maven Artifacts ( Maven 工件 ) 以下 **Maven &lt;dependency&gt;** 元素对于 **Apache Kudu** 公开版本(从**1.0.0** 开始)是有效的: ``` <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.1.0</version> </dependency> ``` **Java** 客户端和各种 **Java** 集成(例如 **Spark** , **Flume** )的便利二进制文件现在也可以通过[ **ASF Maven** 存储库](http://repository.apache.org/) 和[ **Maven Central** 存储库](https://mvnrepository.com/artifact/org.apache.kudu)获得。 ## Impala命令使用 Kudu 的例子 请参阅 [使用 **Impala** 与 **Kudu**](/pages/viewpage.action?pageId=10813620) 进行有关使用 **Kudu** 安装和使用 **Impala** 的指导,其中包括几个 **impala-shell** 示例。 ## Kudu 与 Spark 集成 **Kudu** 从 **1.0.0** 版开始,通过 **Data Source API** 与 **Spark** 集成。使用 **--packages** 选项包括 **kudu-spark** 依赖关系: 如果使用 **Spark** 与 **Scala 2.10** ,请使用 **kudu-spark_2.10** 插件: ``` spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0 ``` 如果在 **Scala 2.11** 中使用 **Spark 2** ,请使用 **kudu-spark2_2.11** 插件: ``` spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.1.0 ``` 然后导入 **kudu-spark** 并创建一个数据框: ``` import org.apache.kudu.spark.kudu._ import org.apache.kudu.client._ import collection.JavaConverters._ // Read a table from Kudu val df = sqlContext.read.options(Map("kudu.master" -> "kudu.master:7051","kudu.table" -> "kudu_table")).kudu // Query using the Spark API... df.select("id").filter("id" >= 5).show() // ...or register a temporary table and use SQL df.registerTempTable("kudu_table") val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show() // Use KuduContext to create, delete, or write to Kudu tables val kuduContext = new KuduContext("kudu.master:7051", sqlContext.sparkContext) // Create a new Kudu table from a dataframe schema // NB: No rows from the dataframe are inserted into the table kuduContext.createTable( "test_table", df.schema, Seq("key"), new CreateTableOptions() .setNumReplicas(1) .addHashPartitions(List("key").asJava, 3)) // Insert data kuduContext.insertRows(df, "test_table") // Delete data kuduContext.deleteRows(filteredDF, "test_table") // Upsert data kuduContext.upsertRows(df, "test_table") // Update data val alteredDF = df.select("id", $"count" + 1) kuduContext.updateRows(filteredRows, "test_table" // Data can also be inserted into the Kudu table using the data source, though the methods on KuduContext are preferred // NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert in the options map // NB: Only mode Append is supported df.write.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")).mode("append").kudu // Check for the existence of a Kudu table kuduContext.tableExists("another_table") // Delete a Kudu table kuduContext.deleteTable("unwanted_table") <> and OR predicates are not pushed to Kudu, and instead will be evaluated by the Spark task. Only LIKE predicates with a suffix wildcard are pushed to Kudu, meaning that LIKE "FOO%" is pushed down but LIKE "FOO%BAR" isn’t. ``` ### Spark 集成已知问题和限制 * 注册为临时表时,必须为具有大写字母或非 **ASCII** 字符的名称的 **Kudu** 表分配备用名称。 * 具有包含大写字母或非 **ASCII** 字符的列名称的 **Kudu** 表可能不与 **SparkSQL** 一起使用。 **Columns **可能在 **Kudu** 更名为解决这个问题。 * **&lt;&gt;** 和 **OR** 谓词不被推送到 **Kudu** ,而是将在 **Spark** 的evaluate过程中执行, 只有具有后缀通配符的 **LIKE** 谓词才被推送到 **Kudu** 执行,这意味着 **LIKE“FOO%”**语句被下推到**Kudu**,而 **“FOO%BAR”** 这样的语句不会。 * **Kudu** 不支持 **Spark SQL** 支持的所有类型,例如 **Date** , **Decimal** 和复杂类型。 * **Kudu** 表只能在 **SparkSQL** 中注册为临时表。 可能不会使用 **HiveContext** 查询 **Kudu** 表。 ## Kudu Python 客户端 **Kudu Python** 客户端为 **C ++** 客户端 API 提供了一个 **Python** 友好的界面。 下面的示例演示了如何使用部分 **Python** 客户端。 ``` import kudu from kudu.client import Partitioning from datetime import datetime # Connect to Kudu master server client = kudu.connect(host='kudu.master', port=7051) # Define a schema for a new table builder = kudu.schema_builder() builder.add_column('key').type(kudu.int64).nullable(False).primary_key() builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4') schema = builder.build() # Define partitioning schema partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) # Create new table client.create_table('python-example', schema, partitioning) # Open a table table = client.table('python-example') # Create a new session so that we can apply write operations session = client.new_session() # Insert a row op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()}) session.apply(op) # Upsert a row op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"}) session.apply(op) # Updating a row op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")}) session.apply(op) # Delete a row op = table.new_delete({'key': 2}) session.apply(op) # Flush write operations, if failures occur, capture print them. try: session.flush() except kudu.KuduBadStatus as e: print(session.get_pending_errors()) # Create a scanner and add a predicate scanner = table.scanner() scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1)) # Open Scanner and read all tuples # Note: This doesn't scale for large scans result = scanner.open().read_all_tuples() ``` ## 与 MapReduce , YARN 和其他框架集成 **Kudu** 旨在与 **Hadoop** 生态系统中的 **MapReduce** , **YARN** , **Spark** 和其他框架集成。 请参阅 **[RowCounter.java](https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/RowCounter.java)** 和** [ImportCsv.java](https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportCsv.java)** ,了解可以对自己的集成进行建模的示例。 请继续关注未来使用 **YARN** 和 **Spark** 的更多示例。