Proposed Pull Request Change

title description ms.service ms.topic ms.custom author ms.author ms.reviewer ms.date
Phoenix Query Server REST SDK - Azure HDInsight Install and use the REST SDK for the Phoenix Query Server in Azure HDInsight. azure-hdinsight how-to hdinsightactive, devx-track-csharp apurbasroy apsinhar nijelsf 02/03/2025
📄 Document Links
GitHub View on GitHub Microsoft Learn View on Microsoft Learn
Raw New Markdown
Generating updated version of doc...
Rendered New Markdown
Generating updated version of doc...
+0 -0
+0 -0
--- title: Phoenix Query Server REST SDK - Azure HDInsight description: Install and use the REST SDK for the Phoenix Query Server in Azure HDInsight. ms.service: azure-hdinsight ms.topic: how-to ms.custom: "hdinsightactive, devx-track-csharp" author: apurbasroy ms.author: apsinhar ms.reviewer: nijelsf ms.date: 02/03/2025 --- # Apache Phoenix Query Server REST SDK [Apache Phoenix](https://phoenix.apache.org/) is an open source, massively parallel relational database layer on top of [Apache HBase](apache-hbase-overview.md). Phoenix allows you to use SQL-like queries with HBase through SSH tools such as [SQLLine](apache-hbase-query-with-phoenix.md). Phoenix also provides an HTTP server called Phoenix Query Server (PQS), a thin client that supports two transport mechanisms for client communication: JSON and Protocol Buffers. Protocol Buffers is the default mechanism, and offers more efficient communication than JSON. This article describes how to use the PQS REST SDK to create tables, upsert rows individually and in bulk, and select data using SQL statements. The examples use the [Microsoft .NET driver for Apache Phoenix Query Server](https://www.nuget.org/packages/Microsoft.Phoenix.Client). This SDK is built on [Apache Calcite's Avatica](https://calcite.apache.org/avatica/) APIs, which exclusively use Protocol Buffers for the serialization format. For more information, see [Apache Calcite Avatica Protocol Buffers Reference](https://calcite.apache.org/avatica/docs/protobuf_reference.html). ## Install the SDK Microsoft .NET driver for Apache Phoenix Query Server is provided as a NuGet package, which can be installed from the Visual Studio **NuGet Package Manager Console** with the following command: ```console Install-Package Microsoft.Phoenix.Client ``` ## Instantiate new PhoenixClient object To begin using the library, instantiate a new `PhoenixClient` object, passing in `ClusterCredentials` containing the `Uri` to your cluster and the cluster's Apache Hadoop user name and password. ```csharp var credentials = new ClusterCredentials(new Uri("https://CLUSTERNAME.azurehdinsight.net/"), "USERNAME", "PASSWORD"); client = new PhoenixClient(credentials); ``` Replace CLUSTERNAME with your HDInsight HBase cluster name, and USERNAME and PASSWORD with the Hadoop credentials specified on cluster creation. The default Hadoop user name is **admin**. ## Generate unique connection identifier To send one or more requests to PQS, you need to include a unique connection identifier to associate the request(s) with the connection. ```csharp string connId = Guid.NewGuid().ToString(); ``` Each example first makes a call to the `OpenConnectionRequestAsync` method, passing in the unique connection identifier. Next, define `ConnectionProperties` and `RequestOptions`, passing those objects and the generated connection identifier to the `ConnectionSyncRequestAsync` method. PQS's `ConnectionSyncRequest` object helps ensure that both the client and server have a consistent view of the database properties. ## ConnectionSyncRequest and its ConnectionProperties To call `ConnectionSyncRequestAsync`, pass in a `ConnectionProperties` object. ```csharp ConnectionProperties connProperties = new ConnectionProperties { HasAutoCommit = true, AutoCommit = true, HasReadOnly = true, ReadOnly = false, TransactionIsolation = 0, Catalog = "", Schema = "", IsDirty = true }; await client.ConnectionSyncRequestAsync(connId, connProperties, options); ``` Here are some properties of interest: | Property | Description | | -- | -- | | AutoCommit | A boolean denoting whether `autoCommit` is enabled for Phoenix transactions. | | ReadOnly | A boolean denoting whether the connection is read-only. | | TransactionIsolation | An integer denoting the level of transaction isolation per the JDBC specification - see the following table.| | Catalog | The name of the catalog to use when fetching connection properties. | | Schema | The name of the schema to use when fetching connection properties. | | IsDirty | A boolean denoting whether the properties have been altered. | Here are the `TransactionIsolation` values: | Isolation value | Description | | -- | -- | | 0 | Transactions aren't supported. | | 1 | Dirty reads, non-repeatable reads, and phantom reads may occur. | | 2 | Dirty reads are prevented, but non-repeatable reads and phantom reads may occur. | | 4 | Dirty reads and non-repeatable reads are prevented, but phantom reads may occur. | | 8 | Dirty reads, non-repeatable reads, and phantom reads are all prevented. | ## Create a new table HBase, like any other RDBMS, stores data in tables. Phoenix uses standard SQL queries to create new tables, while defining the primary key and column types. This example and all later examples, use the instantiated `PhoenixClient` object as defined in [Instantiate a new PhoenixClient object](#instantiate-new-phoenixclient-object). ```csharp string connId = Guid.NewGuid().ToString(); RequestOptions options = RequestOptions.GetGatewayDefaultOptions(); // You can set certain request options, such as timeout in milliseconds: options.TimeoutMillis = 300000; // In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/ // Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0 options.AlternativeEndpoint = "hbasephoenix0/"; CreateStatementResponse createStatementResponse = null; OpenConnectionResponse openConnResponse = null; try { // Opening connection var info = new pbc::MapField<string, string>(); openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options); // Syncing connection ConnectionProperties connProperties = new ConnectionProperties { HasAutoCommit = true, AutoCommit = true, HasReadOnly = true, ReadOnly = false, TransactionIsolation = 0, Catalog = "", Schema = "", IsDirty = true }; await client.ConnectionSyncRequestAsync(connId, connProperties, options); // Create the statement createStatementResponse = client.CreateStatementRequestAsync(connId, options).Result; // Create the table if it does not exist string sql = "CREATE TABLE IF NOT EXISTS Customers (Id varchar(20) PRIMARY KEY, FirstName varchar(50), " + "LastName varchar(100), StateProvince char(2), Email varchar(255), Phone varchar(15))"; await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options); Console.WriteLine($"Table \"Customers\" created."); } catch (Exception e) { Console.WriteLine(e); throw; } finally { if (createStatementResponse != null) { client.CloseStatementRequestAsync(connId, createStatementResponse.StatementId, options).Wait(); createStatementResponse = null; } if (openConnResponse != null) { client.CloseConnectionRequestAsync(connId, options).Wait(); openConnResponse = null; } } ``` The preceding example creates a new table named `Customers` using the `IF NOT EXISTS` option. The `CreateStatementRequestAsync` call creates a new statement in the Avitica (PQS) server. The `finally` block closes the returned `CreateStatementResponse` and the `OpenConnectionResponse` objects. ## Insert data individually This example shows an individual data insert, referencing a `List<string>` collection of American state and territory abbreviations: ```csharp var states = new List<string> { "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FM", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" }; ``` The table's `StateProvince` column value will be used in a later select operation. ```csharp string connId = Guid.NewGuid().ToString(); RequestOptions options = RequestOptions.GetGatewayDefaultOptions(); options.TimeoutMillis = 300000; // In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/ // Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0 options.AlternativeEndpoint = "hbasephoenix0/"; OpenConnectionResponse openConnResponse = null; StatementHandle statementHandle = null; try { // Opening connection pbc::MapField<string, string> info = new pbc::MapField<string, string>(); openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options); // Syncing connection ConnectionProperties connProperties = new ConnectionProperties { HasAutoCommit = true, AutoCommit = true, HasReadOnly = true, ReadOnly = false, TransactionIsolation = 0, Catalog = "", Schema = "", IsDirty = true }; await client.ConnectionSyncRequestAsync(connId, connProperties, options); string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)"; PrepareResponse prepareResponse = await client.PrepareRequestAsync(connId, sql, 100, options); statementHandle = prepareResponse.Statement; var r = new Random(); // Insert 300 rows for (int i = 0; i < 300; i++) { var list = new pbc.RepeatedField<TypedValue>(); var id = new TypedValue { StringValue = "id" + i, Type = Rep.String }; var firstName = new TypedValue { StringValue = "first" + i, Type = Rep.String }; var lastName = new TypedValue { StringValue = "last" + i, Type = Rep.String }; var state = new TypedValue { StringValue = states.ElementAt(r.Next(0, 49)), Type = Rep.String }; var email = new TypedValue { StringValue = $"email{1}@junkemail.com", Type = Rep.String }; var phone = new TypedValue { StringValue = $"555-229-341{i.ToString().Substring(0,1)}", Type = Rep.String }; list.Add(id); list.Add(firstName); list.Add(lastName); list.Add(state); list.Add(email); list.Add(phone); Console.WriteLine("Inserting customer " + i); await client.ExecuteRequestAsync(statementHandle, list, long.MaxValue, true, options); } await client.CommitRequestAsync(connId, options); Console.WriteLine("Upserted customer data"); } catch (Exception ex) { } finally { if (statementHandle != null) { await client.CloseStatementRequestAsync(connId, statementHandle.Id, options); statementHandle = null; } if (openConnResponse != null) { await client.CloseConnectionRequestAsync(connId, options); openConnResponse = null; } } ``` The structure for executing an insert statement is similar to creating a new table. At the end of the `try` block, the transaction is explicitly committed. This example repeats an insert transaction 300 times. The following example shows a more efficient batch insert process. ## Batch insert data The following code is nearly identical to the code for inserting data individually. This example uses the `UpdateBatch` object in a call to `ExecuteBatchRequestAsync`, rather than repeatedly calling `ExecuteRequestAsync` with a prepared statement. ```csharp string connId = Guid.NewGuid().ToString(); RequestOptions options = RequestOptions.GetGatewayDefaultOptions(); options.TimeoutMillis = 300000; // In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/ // Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0 options.AlternativeEndpoint = "hbasephoenix0/"; OpenConnectionResponse openConnResponse = null; CreateStatementResponse createStatementResponse = null; try { // Opening connection pbc::MapField<string, string> info = new pbc::MapField<string, string>(); openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options); // Syncing connection ConnectionProperties connProperties = new ConnectionProperties { HasAutoCommit = true, AutoCommit = true, HasReadOnly = true, ReadOnly = false, TransactionIsolation = 0, Catalog = "", Schema = "", IsDirty = true }; await client.ConnectionSyncRequestAsync(connId, connProperties, options); // Creating statement createStatementResponse = await client.CreateStatementRequestAsync(connId, options); string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)"; PrepareResponse prepareResponse = client.PrepareRequestAsync(connId, sql, long.MaxValue, options).Result; var statementHandle = prepareResponse.Statement; var updates = new pbc.RepeatedField<UpdateBatch>(); var r = new Random(); // Insert 300 rows for (int i = 300; i < 600; i++) { var list = new pbc.RepeatedField<TypedValue>(); var id = new TypedValue { StringValue = "id" + i, Type = Rep.String }; var firstName = new TypedValue { StringValue = "first" + i, Type = Rep.String }; var lastName = new TypedValue { StringValue = "last" + i, Type = Rep.String }; var state = new TypedValue { StringValue = states.ElementAt(r.Next(0, 49)), Type = Rep.String }; var email = new TypedValue { StringValue = $"email{1}@junkemail.com", Type = Rep.String }; var phone = new TypedValue { StringValue = $"555-229-341{i.ToString().Substring(0, 1)}", Type = Rep.String }; list.Add(id); list.Add(firstName); list.Add(lastName); list.Add(state); list.Add(email); list.Add(phone); var batch = new UpdateBatch { ParameterValues = list }; updates.Add(batch); Console.WriteLine($"Added customer {i} to batch"); } var executeBatchResponse = await client.ExecuteBatchRequestAsync(connId, statementHandle.Id, updates, options); Console.WriteLine("Batch upserted customer data"); } catch (Exception ex) { } finally { if (openConnResponse != null) { await client.CloseConnectionRequestAsync(connId, options); openConnResponse = null; } } ``` In one test environment, individually inserting 300 new records took almost 2 minutes. In contrast, inserting 300 records as a batch required only 6 seconds. ## Select data This example shows how to reuse one connection to execute multiple queries: 1. Select all records, and then fetch remaining records after the default maximum of 100 are returned. 2. Use a total row count select statement to retrieve the single scalar result. 3. Execute a select statement that returns the total number of customers per state or territory. ```csharp string connId = Guid.NewGuid().ToString(); RequestOptions options = RequestOptions.GetGatewayDefaultOptions(); // In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/ // Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0 options.AlternativeEndpoint = "hbasephoenix0/"; OpenConnectionResponse openConnResponse = null; StatementHandle statementHandle = null; try { // Opening connection pbc::MapField<string, string> info = new pbc::MapField<string, string>(); openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options); // Syncing connection ConnectionProperties connProperties = new ConnectionProperties { HasAutoCommit = true, AutoCommit = true, HasReadOnly = true, ReadOnly = false, TransactionIsolation = 0, Catalog = "", Schema = "", IsDirty = true }; await client.ConnectionSyncRequestAsync(connId, connProperties, options); var createStatementResponse = await client.CreateStatementRequestAsync(connId, options); string sql = "SELECT * FROM Customers"; ExecuteResponse executeResponse = await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options); pbc::RepeatedField<Row> rows = executeResponse.Results[0].FirstFrame.Rows; // Loop through all of the returned rows and display the first two columns for (int i = 0; i < rows.Count; i++) { Row row = rows[i]; Console.WriteLine(row.Value[0].ScalarValue.StringValue + " " + row.Value[1].ScalarValue.StringValue); } // 100 is hard-coded on the server side as the default firstframe size // FetchRequestAsync is called to get any remaining rows Console.WriteLine(""); Console.WriteLine($"Number of rows: {rows.Count}"); // Fetch remaining rows, offset is not used, simply set to 0 // When FetchResponse.Frame.Done is true, all rows were fetched FetchResponse fetchResponse = await client.FetchRequestAsync(connId, createStatementResponse.StatementId, 0, int.MaxValue, options); Console.WriteLine($"Frame row count: {fetchResponse.Frame.Rows.Count}"); Console.WriteLine($"Fetch response is done: {fetchResponse.Frame.Done}"); Console.WriteLine(""); // Running query 2 string sql2 = "select count(*) from Customers"; ExecuteResponse countResponse = await client.PrepareAndExecuteRequestAsync(connId, sql2, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options); long count = countResponse.Results[0].FirstFrame.Rows[0].Value[0].ScalarValue.NumberValue; Console.WriteLine($"Total customer records: {count}"); Console.WriteLine(""); // Running query 3 string sql3 = "select StateProvince, count(*) as Number from Customers group by StateProvince order by Number desc"; ExecuteResponse groupByResponse = await client.PrepareAndExecuteRequestAsync(connId, sql3, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options); pbc::RepeatedField<Row> stateRows = groupByResponse.Results[0].FirstFrame.Rows; for (int i = 0; i < stateRows.Count; i++) { Row row = stateRows[i]; Console.WriteLine(row.Value[0].ScalarValue.StringValue + ": " + row.Value[1].ScalarValue.NumberValue); } } catch (Exception ex) { } finally { if (statementHandle != null) { await client.CloseStatementRequestAsync(connId, statementHandle.Id, options); statementHandle = null; } if (openConnResponse != null) { await client.CloseConnectionRequestAsync(connId, options); openConnResponse = null; } } ``` The output of the `select` statements should be the following result: ```output id0 first0 id1 first1 id10 first10 id100 first100 id101 first101 id102 first102 . . . id185 first185 id186 first186 id187 first187 id188 first188 Number of rows: 100 Frame row count: 500 Fetch response is done: True Total customer records: 600 NJ: 21 CA: 19 GU: 17 NC: 16 IN: 16 MA: 16 AZ: 16 ME: 16 IL: 15 OR: 15 . . . MO: 10 HI: 10 GA: 10 DC: 9 NM: 9 MD: 9 MP: 9 SC: 7 AR: 7 MH: 6 FM: 5 ``` ## Next steps * [Apache Phoenix in HDInsight](../hdinsight-phoenix-in-hdinsight.md) * [Using the Apache HBase REST SDK](apache-hbase-rest-sdk.md)
Success! Branch created successfully. Create Pull Request on GitHub
Error: