High performance reactive PostgreSQL client written in Java
The Reactive Postgres Client is a client for Postgres with a straightforward API focusing on scalability and low overhead.
The client is reactive and non blocking, allowing to handle many database connections with a single thread.
NOTIFY/LISTEN
To use the Reactive Postgres Client add the following dependency to the dependencies section of your build descriptor:
pom.xml
):<dependency>
<groupId>io.reactiverse</groupId>
<artifactId>reactive-pg-client</artifactId>
<version>0.11.4</version>
</dependency>
build.gradle
file):dependencies {
compile 'io.reactiverse:reactive-pg-client:0.11.4'
}
Here is the simplest way to connect, query and disconnect
// Pool options
def options = [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret",
maxSize:5
]
// Create the client pool
def client = PgClient.pool(options)
// A simple query
client.query("SELECT * FROM users WHERE id='julien'", { ar ->
if (ar.succeeded()) {
def result = ar.result()
println("Got ${result.size()} rows ")
} else {
println("Failure: ${ar.cause().getMessage()}")
}
// Now close the pool
client.close()
})
Most of the time you will use a pool to connect to Postgres:
// Pool options
def options = [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret",
maxSize:5
]
// Create the pooled client
def client = PgClient.pool(options)
The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.
If you are running with Vert.x you can pass it your Vertx instance:
// Pool options
def options = [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret",
maxSize:5
]
// Create the pooled client
def client = PgClient.pool(vertx, options)
You need to release the pool when you don’t need it anymore:
// Close the pool and all the associated resources
pool.close()
When you need to execute several operations on the same connection, you need to use a client
connection
.
You can easily get one from the pool:
// Pool options
def options = [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret",
maxSize:5
]
// Create the pooled client
def client = PgClient.pool(vertx, options)
// Get a connection from the pool
client.getConnection({ ar1 ->
if (ar1.succeeded()) {
println("Connected")
// Obtain our connection
def conn = ar1.result()
// All operations execute on the same connection
conn.query("SELECT * FROM users WHERE id='julien'", { ar2 ->
if (ar2.succeeded()) {
conn.query("SELECT * FROM users WHERE id='emad'", { ar3 ->
// Release the connection to the pool
conn.close()
})
} else {
// Release the connection to the pool
conn.close()
}
})
} else {
println("Could not connect: ${ar1.cause().getMessage()}")
}
})
Once you are done with the connection you must close it to release it to the pool, so it can be reused.
Sometimes you want to improve performance via Unix domain socket connection, we achieve this with Vert.x Native transports.
Make sure you have added the required netty-transport-native
dependency in your classpath and enabled the Unix domain socket option.
// Pool Options
// Socket file name will be /var/run/postgresql/.s.PGSQL.5432
def options = [
host:"/var/run/postgresql",
port:5432,
database:"the-db"
]
// Create the pooled client
def client = PgClient.pool(options)
// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
def client2 = PgClient.pool(vertx, options)
More information can be found in the Vert.x documentation.
There are several options for you to configure the client.
Apart from configuring with a PgPoolOptions
data object, We also provide you an alternative way to connect when you want to configure with a connection URI:
// Connection URI
def connectionUri = "postgresql://dbuser:secretpassword@database.server.com:3211/mydb"
// Create the pool from the connection URI
def pool = PgClient.pool(connectionUri)
// Create the connection from the connection URI
PgClient.connect(vertx, connectionUri, { res ->
// Handling your connection
})
More information about connection string formats can be found in the PostgreSQL Manuals.
You can also use environment variables to set default connection setting values, this is useful when you want to avoid hard-coding database connection information. You can refer to the official documentation for more details. The following parameters are supported:
PGHOST
PGHOSTADDR
PGPORT
PGDATABASE
PGUSER
PGPASSWORD
PGSSLMODE
If you don’t specify a data object or a connection URI string to connect, environment variables will take precedence over them.
$ PGUSER=user \
PGHOST=the-host \
PGPASSWORD=secret \
PGDATABASE=the-db \
PGPORT=5432 \
PGSSLMODE=DISABLE
// Create the pool from the environment variables
def pool = PgClient.pool()
// Create the connection from the environment variables
PgClient.connect(vertx, { res ->
// Handling your connection
})
When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.
Here is how to run simple queries:
client.query("SELECT * FROM users WHERE id='julien'", { ar ->
if (ar.succeeded()) {
def result = ar.result()
println("Got ${result.size()} rows ")
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
You can do the same with prepared queries.
The SQL string can refer to parameters by position, using $1
, $2
, etc…
client.preparedQuery("SELECT * FROM users WHERE id=$1", Tuple.of("julien"), { ar ->
if (ar.succeeded()) {
def rows = ar.result()
println("Got ${rows.size()} rows ")
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
Query methods provides an asynchronous PgRowSet
instance that works for SELECT queries
client.preparedQuery("SELECT first_name, last_name FROM users", { ar ->
if (ar.succeeded()) {
def rows = ar.result()
rows.each { row ->
println("User ${row.getString(0)} ${row.getString(1)}")
}
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
or UPDATE/INSERT queries:
client.preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)", Tuple.of("Julien", "Viet"), { ar ->
if (ar.succeeded()) {
def rows = ar.result()
println(rows.rowCount())
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
The Row
gives you access to your data by index
println("User ${row.getString(0)} ${row.getString(1)}")
or by name
println("User ${row.getString("first_name")} ${row.getString("last_name")}")
You can access a wide variety of of types
def firstName = row.getString("first_name")
def male = row.getBoolean("male")
def age = row.getInteger("age")
// ...
You can execute prepared batch
// Add commands to the batch
def batch = []
batch.add(Tuple.of("julien", "Julien Viet"))
batch.add(Tuple.of("emad", "Emad Alblueshi"))
// Execute the prepared batch
client.preparedBatch("INSERT INTO USERS (id, name) VALUES ($1, $2)", batch, { res ->
if (res.succeeded()) {
// Process rows
def rows = res.result()
} else {
println("Batch failed ${res.cause()}")
}
})
You can cache prepared queries:
// Enable prepare statements
options.cachePreparedStatements = true
def client = PgClient.pool(vertx, options)
You can fetch generated keys with a ‘RETURNING’ clause in your query:
client.preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id", Tuple.of("white", "red", "blue"), { ar ->
if (ar.succeeded()) {
def rows = ar.result()
println(rows.rowCount())
rows.each { row ->
println("generated key: ${row.getInteger("color_id")}")
}
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool:
Code not translatable
Prepared queries can be created:
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", { ar1 ->
if (ar1.succeeded()) {
def pq = ar1.result()
pq.execute(Tuple.of("julien"), { ar2 ->
if (ar2.succeeded()) {
// All rows
def rows = ar2.result()
}
})
}
})
NOTE: prepared query caching depends on the setCachePreparedStatements
and
does not depend on whether you are creating prepared queries or use direct prepared queries
PgPreparedQuery
can perform efficient batching:
connection.prepare("INSERT INTO USERS (id, name) VALUES ($1, $2)", { ar1 ->
if (ar1.succeeded()) {
def prepared = ar1.result()
// Create a query : bind parameters
def batch = []
// Add commands to the createBatch
batch.add(Tuple.of("julien", "Julien Viet"))
batch.add(Tuple.of("emad", "Emad Alblueshi"))
prepared.batch(batch, { res ->
if (res.succeeded()) {
// Process rows
def rows = res.result()
} else {
println("Batch failed ${res.cause()}")
}
})
}
})
You can execute transaction using SQL BEGIN
/COMMIT
/ROLLBACK
, if you do so you must use
a PgConnection
and manage it yourself.
Or you can use the transaction API of PgConnection
:
Code not translatable
When Postgres reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until
end of transaction block), the transaction is rollbacked and the abortHandler
is called:
pool.getConnection({ res ->
if (res.succeeded()) {
// Transaction must use a connection
def conn = res.result()
// Begin the transaction
def tx = conn.begin().abortHandler({ v ->
println("Transaction failed => rollbacked")
})
conn.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')", { ar ->
// Works fine of course
if (ar.succeeded()) {
} else {
tx.rollback()
conn.close()
}
})
conn.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')", { ar ->
// Fails and triggers transaction aborts
})
// Attempt to commit the transaction
tx.commit({ ar ->
// But transaction abortion fails it
// Return the connection to the pool
conn.close()
})
}
})
When you use a pool, you can start a transaction directly on the pool.
It borrows a connection from the pool, begins the transaction and releases the connection to the pool when the transaction ends.
Code not translatable
By default prepared query execution fetches all rows, you can use a
PgCursor
to control the amount of rows you want to read:
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", { ar1 ->
if (ar1.succeeded()) {
def pq = ar1.result()
// Cursors require to run within a transaction
def tx = connection.begin()
// Create a cursor
def cursor = pq.cursor(Tuple.of("julien"))
// Read 50 rows
cursor.read(50, { ar2 ->
if (ar2.succeeded()) {
def rows = ar2.result()
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - commit the transaction
tx.commit()
}
}
})
}
})
PostreSQL destroys cursors at the end of a transaction, so the cursor API shall be used
within a transaction, otherwise you will likely get the 34000
PostgreSQL error.
Cursors shall be closed when they are released prematurely:
cursor.read(50, { ar2 ->
if (ar2.succeeded()) {
// Close the cursor
cursor.close()
}
})
A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", { ar1 ->
if (ar1.succeeded()) {
def pq = ar1.result()
// Streams require to run within a transaction
def tx = connection.begin()
// Fetch 50 rows at a time
def stream = pq.createStream(50, Tuple.of("julien"))
// Use the stream
stream.exceptionHandler({ err ->
println("Error: ${err.getMessage()}")
})
stream.endHandler({ v ->
tx.commit()
println("End of stream")
})
stream.handler({ row ->
println("User: ${row.getString("last_name")}")
})
}
})
The stream read the rows by batch of 50
and stream them, when the rows have been passed to the handler,
a new batch of 50
is read and so on.
The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.
Currently the client supports the following Postgres types
java.lang.Boolean
)java.lang.Short
)java.lang.Integer
)java.lang.Long
)java.lang.Float
)java.lang.Double
)java.lang.String
)java.lang.String
)java.lang.String
)java.lang.String
)java.lang.String
)java.lang.Short
)java.lang.Integer
)java.lang.Long
)io.reactiverse.pgclient.data.Numeric
)java.util.UUID
)java.time.LocalDate
)java.time.LocalTime
)java.time.OffsetTime
)java.time.LocalDateTime
)java.time.OffsetDateTime
)io.reactiverse.pgclient.data.Interval
)io.vertx.core.buffer.Buffer
)io.reactiverse.pgclient.data.Json
)io.reactiverse.pgclient.data.Json
)io.reactiverse.pgclient.data.Point
)io.reactiverse.pgclient.data.Line
)io.reactiverse.pgclient.data.LineSegment
)io.reactiverse.pgclient.data.Box
)io.reactiverse.pgclient.data.Path
)io.reactiverse.pgclient.data.Polygon
)io.reactiverse.pgclient.data.Circle
)Tuple decoding uses the above types when storing values, it also performs on the flu conversion the actual value when possible:
pool.query("SELECT 1::BIGINT \"VAL\"", { ar ->
def rowSet = ar.result()
def row = rowSet.iterator().next()
// Stored as java.lang.Long
def value = row.getValue(0)
// Convert to java.lang.Integer
def intValue = row.getInteger(0)
})
Tuple encoding uses the above type mapping for encoding, unless the type is numeric in which case java.lang.Number
is used instead:
pool.query("SELECT 1::BIGINT \"VAL\"", { ar ->
def rowSet = ar.result()
def row = rowSet.iterator().next()
// Stored as java.lang.Long
def value = row.getValue(0)
// Convert to java.lang.Integer
def intValue = row.getInteger(0)
})
Arrays of these types are supported.
The Json
Java type is used to represent the Postgres JSON
and JSONB
type.
The main reason of this type is handling null
JSON values.
// Create a tuple
def tuple = Tuple.of(Json.create(Json.create(null)), Json.create(Json.create([
foo:"bar"
])), Json.create(Json.create(null)))
// Retrieving json
def value = tuple.getJson(0).value()
//
value = tuple.getJson(1).value()
//
value = tuple.getJson(3).value()
The Numeric
Java type is used to represent the Postgres NUMERIC
type.
def numeric = row.getNumeric("value")
if (numeric.isNaN()) {
// Handle NaN
} else {
def value = numeric.bigDecimalValue()
}
Arrays are available on Tuple
and Row
:
Code not translatable
Strings are used to represent custom types, both sent to and returned from Postgres.
You can read from Postgres and get the custom type as a string
client.preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1", Tuple.of(3), { ar ->
if (ar.succeeded()) {
def rows = ar.result()
rows.each { row ->
println("Full Address ${row.getString(0)}, City ${row.getString(1)}")
}
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
You can also write to Postgres by providing a string
client.preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)", Tuple.of(3, "('Anytown', 'Second Ave', false)"), { ar ->
if (ar.succeeded()) {
def rows = ar.result()
println(rows.rowCount())
} else {
println("Failure: ${ar.cause().getMessage()}")
}
})
You can use Java collectors with the query API:
Code not translatable
The collector processing must not keep a reference on the Row
as
there is a single row used for processing the entire set.
The Java Collectors
provides many interesting predefined collectors, for example you can
create easily create a string directly from the row set:
Code not translatable
The rxified API supports RxJava 1 and RxJava 2, the following examples use RxJava 2.
Most asynchronous constructs are available as methods prefixed by rx
:
Code not translatable
RxJava 2 supports Observable
and Flowable
types, these are exposed using
the PgStream
that you can get
from a PgPreparedQuery
:
Code not translatable
The same example using Flowable
:
Code not translatable
The simplified transaction API allows to easily write transactional asynchronous flows:
Code not translatable
Postgres supports pub/sub communication channels.
You can set a notificationHandler
to receive
Postgres notifications:
connection.notificationHandler({ notification ->
println("Received ${notification.payload} on channel ${notification.channel}")
})
connection.query("LISTEN some-channel", { ar ->
println("Subscribed to channel")
})
The PgSubscriber
is a channel manager managing a single connection that
provides per channel subscription:
def subscriber = PgSubscriber.subscriber(vertx, [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret"
])
// You can set the channel before connect
subscriber.channel("channel1").handler({ payload ->
println("Received ${payload}")
})
subscriber.connect({ ar ->
if (ar.succeeded()) {
// Or you can set the channel after connect
subscriber.channel("channel2").handler({ payload ->
println("Received ${payload}")
})
}
})
The channel name that is given to the channel method will be the exact name of the channel as held by Postgres for sending
notifications. Note this is different than the representation of the channel name in SQL, and
internally PgSubscriber
will prepare the submitted channel name as a quoted identifier:
def subscriber = PgSubscriber.subscriber(vertx, [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret"
])
subscriber.connect({ ar ->
if (ar.succeeded()) {
// Complex channel name - name in PostgreSQL requires a quoted ID
subscriber.channel("Complex.Channel.Name").handler({ payload ->
println("Received ${payload}")
})
subscriber.channel("Complex.Channel.Name").subscribeHandler({ subscribed ->
subscriber.actualConnection().query("NOTIFY \"Complex.Channel.Name\", 'msg'", { notified ->
println("Notified \"Complex.Channel.Name\"")
})
})
// PostgreSQL simple ID's are forced lower-case
subscriber.channel("simple_channel").handler({ payload ->
println("Received ${payload}")
})
subscriber.channel("simple_channel").subscribeHandler({ subscribed ->
// The following simple channel identifier is forced to lower case
subscriber.actualConnection().query("NOTIFY Simple_CHANNEL, 'msg'", { notified ->
println("Notified simple_channel")
})
})
// The following channel name is longer than the current
// (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb").handler({ payload ->
println("Received ${payload}")
})
}
})
You can provide a reconnect policy as a function that takes the number of retries
as argument and returns an amountOfTime
value:
amountOfTime < 0
: the subscriber is closed and there is no retryamountOfTime = 0
: the subscriber retries to connect immediatelyamountOfTime > 0
: the subscriber retries after amountOfTime
milliseconds
def subscriber = PgSubscriber.subscriber(vertx, [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret"
])
// Reconnect at most 10 times after 100 ms each
subscriber.reconnectPolicy({ retries ->
if (retries < 10) {
return 100L
} else {
return -1L
}
})
The default policy is to not reconnect.
Postgres supports cancellation of requests in progress. You can cancel inflight requests using cancelRequest
. Cancelling a request opens a new connection to the server and cancels the request and then close the connection.
connection.query("SELECT pg_sleep(20)", { ar ->
if (ar.succeeded()) {
// imagine this is a long query and is still running
println("Query success")
} else {
// the server will abort the current query after cancelling request
println("Failed to query due to ${ar.cause().getMessage()}")
}
})
connection.cancelRequest({ ar ->
if (ar.succeeded()) {
println("Cancelling request has been sent")
} else {
println("Failed to send cancelling request")
}
})
The cancellation signal might or might not have any effect — for example, if it arrives after the backend has finished processing the query, then it will have no effect. If the cancellation is effective, it results in the current command being terminated early with an error message.
More information can be found in the official documentation.
To configure the client to use SSL connection, you can configure the PgConnectOptions
like a Vert.x NetClient
.
All SSL modes are supported and you are able to configure sslmode
. The client is in DISABLE
SSL mode by default.
ssl
parameter is kept as a mere shortcut for setting sslmode
. setSsl(true)
is equivalent to setSslMode(VERIFY_CA)
and setSsl(false)
is equivalent to setSslMode(DISABLE)
.
def options = [
port:5432,
host:"the-host",
database:"the-db",
user:"user",
password:"secret",
sslMode:"VERIFY_CA",
pemTrustOptions:[
certPaths:[
"/path/to/cert.pem"
]
]
]
PgClient.connect(vertx, options, { res ->
if (res.succeeded()) {
// Connected with SSL
} else {
println("Could not connect ${res.cause()}")
}
})
More information can be found in the Vert.x documentation.
You can also configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy.
More information can be found in the Vert.x documentation.