This guide document advanced/internal stuff about Vert.x.

It aims to explain and discuss the following

  • Vert.x design

  • Internal APIs

  • Integration with Netty

  • Code generation (TBD)

  • Metrics SPI (TBD)

You want to read this guide when you want to

  • understand better Vert.x design

  • integrate Vert.x with other libraries

  • perform networking with Netty and Vert.x

This is a live guide and you can contribute, just open a PR or an issue in the repo.

Some of the internal Vert.x APIs are exposed in this guide and you should keep in mind that these APIs are subject to be changed when it is needed.

Demystifying the Event Loop

The event loop plays an important role in Vert.x for writing highly scalable and performant network applications.

The event loop is inherited from the Netty library on which Vert.x is based.

We often use the expression running on the event loop, it has a very specific meaning: it means that the current Thread is an event loop thread. This article provides an overview of the Vert.x event loop and the concepts related to it.

The golden rule

When using Vert.x there is one Vert.x golden rule to respect:

Never block the event loop!
— Tim Fox

The code executed on the event loop should never block the event loop, for instance:

  • using a blocking method directly or, for instance, reading a file with the java.io.FileInputStream api or a JDBC connection.

  • doing a long and CPU intensive task

When the event loop is blocked:

Vertx vertx = Vertx.vertx();
vertx.setTimer(1, id -> {
  // Blocking the Vert.x event loop
  try {
    Thread.sleep(7000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
});

Vert.x will detect it and log a warn:

Jan 31, 2016 9:10:23 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2880 ms, time limit is 2000
Jan 31, 2016 9:10:24 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3881 ms, time limit is 2000
Jan 31, 2016 9:10:25 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4883 ms, time limit is 2000
Jan 31, 2016 9:10:26 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5885 ms, time limit is 2000
io.vertx.core.VertxException: Thread blocked
	at java.lang.Thread.sleep(Native Method)
	at org.vietj.vertx.eventloop.BlockingEventLoop.lambda$main$30(BlockingEventLoop.java:16)
	at org.vietj.vertx.eventloop.BlockingEventLoop$$Lambda$3/232824863.handle(Unknown Source)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:738)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:709)
	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$16(ContextImpl.java:335)
	at io.vertx.core.impl.ContextImpl$$Lambda$6/295964645.run(Unknown Source)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
	at java.lang.Thread.run(Thread.java:745)

Jan 31, 2016 9:10:27 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 6890 ms, time limit is 2000
io.vertx.core.VertxException: Thread blocked
	at java.lang.Thread.sleep(Native Method)
	at org.vietj.vertx.eventloop.BlockingEventLoop.lambda$main$30(BlockingEventLoop.java:16)
	at org.vietj.vertx.eventloop.BlockingEventLoop$$Lambda$3/232824863.handle(Unknown Source)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:738)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:709)
	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$16(ContextImpl.java:335)
	at io.vertx.core.impl.ContextImpl$$Lambda$6/295964645.run(Unknown Source)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
	at java.lang.Thread.run(Thread.java:745)

The event loop must not be blocked, because it will freeze the parts of the applications using that event loop, with severe consequences on the scalability and the throughput of the application.

The context

Beyond the event loop, Vert.x defines the notion of a context. At a high level, the context can be thought of as controlling the scope and order in which a set of handlers (or tasks created by handlers) are executed.

When the Vert.x API consumes callbacks (for instance setting an HttpServer request handler), it associates a callback handler with a context. This context is then used for scheduling the callbacks, when such context is needed:

  • if the current thread is a Vert.x thread, it reuses the context associated with this thread: the context is propagated.

  • otherwise a new context is created for this purpose.

However there is one case where context propagation does not apply: deploying a Verticle creates a new context for this Verticle, according to the deployment options of the deployment. Therefore a Verticle is always associated with a context. Any handler registered within a verticle - whether it be an event bus consumer, HTTP server handler, or any other asynchronous operation - will be registered using the verticle’s context.

Vert.x provides three different types of contexts.

  • Event loop context

  • Worker context

  • Multi-threaded worker context

Event loop context

An event loop context executes handlers on an event loop: handlers are executed directly on the IO threads, as a consequence:

  • A handler will always be executed with the same thread

  • A handler must never block the thread, otherwise it will create starvation for all the IO tasks associated with that event loop.

This behavior allows for a greatly simplified threading model by guaranteeing that associated handlers will always be executed on the same thread, thus removing the need for synchronization and other locking mechanisms.

This is the type of context that is the default and most commonly used type of context. Verticles deployed without the worker flag will always be deployed with an event loop context.

When Vert.x creates an event loop context, it chooses an event loop for this context, the event loop is chosen via a round robin algorithm. This can be demonstrated by creating a timer many times:

System.out.println(Thread.currentThread());
Vertx vertx = Vertx.vertx();
for (int i = 0; i < 20; i++) {
  int index = i;
  vertx.setTimer(1, timerID -> {
    System.out.println(index + ":" + Thread.currentThread());
  });
}

The result is:

Thread[main,5,main]
0:Thread[vert.x-eventloop-thread-0,5,main]
1:Thread[vert.x-eventloop-thread-1,5,main]
2:Thread[vert.x-eventloop-thread-2,5,main]
3:Thread[vert.x-eventloop-thread-3,5,main]
4:Thread[vert.x-eventloop-thread-4,5,main]
5:Thread[vert.x-eventloop-thread-5,5,main]
6:Thread[vert.x-eventloop-thread-6,5,main]
7:Thread[vert.x-eventloop-thread-7,5,main]
8:Thread[vert.x-eventloop-thread-8,5,main]
9:Thread[vert.x-eventloop-thread-9,5,main]
10:Thread[vert.x-eventloop-thread-10,5,main]
11:Thread[vert.x-eventloop-thread-11,5,main]
12:Thread[vert.x-eventloop-thread-12,5,main]
13:Thread[vert.x-eventloop-thread-13,5,main]
14:Thread[vert.x-eventloop-thread-14,5,main]
15:Thread[vert.x-eventloop-thread-15,5,main]
16:Thread[vert.x-eventloop-thread-0,5,main]
17:Thread[vert.x-eventloop-thread-1,5,main]
18:Thread[vert.x-eventloop-thread-2,5,main]
19:Thread[vert.x-eventloop-thread-3,5,main]

As we can see we obtained different event loop threads for each timer and the threads are obtained with a round robin policy. Note that the number of event loop threads by default depends on your CPU but this can be configured.

An event loop context guarantees to always use the same thread, however the converse is not true: the same thread can be used by different event loop contexts. The previous example shows clearly that a same thread is used for different event loops by the Round Robin policy.

The default number of event loop created by a Vertx instance is twice the number of cores of your CPU. This value can be overriden when creating a Vertx instance:

Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(10));

Worker context

Worker contexts are assigned to verticles deployed with the worker option enabled. The worker context is differentiated from standard event loop contexts in that workers are executed on a separate worker thread pool.

This separation from event loop threads allows worker contexts to execute the types of blocking operations that will block the event loop: blocking such thread will not impact the application other than blocking one thread.

Just as is the case with the event loop context, worker contexts ensure that handlers are only executed on one thread at any given time. That is, handlers executed on a worker context will always be executed sequentially - one after the other - but different actions may be executed on different threads.

A common pattern is to deploy worker verticles and send them a message and then the worker replies to this message:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    vertx.eventBus().consumer("the-address", msg -> {
      try {
        Thread.sleep(10);
        System.out.println("Executed by " + Thread.currentThread());
        msg.reply("whatever");
      } catch (InterruptedException e) {
        msg.fail(0, "Interrupted");
      }
    });
  }
}, new DeploymentOptions().setWorker(true));

// Send 10 messages
send(vertx, 10);

This prints:

Executed by Thread[vert.x-worker-thread-1,5,main]
Executed by Thread[vert.x-worker-thread-2,5,main]
Executed by Thread[vert.x-worker-thread-3,5,main]
Executed by Thread[vert.x-worker-thread-4,5,main]
Executed by Thread[vert.x-worker-thread-5,5,main]
Executed by Thread[vert.x-worker-thread-6,5,main]
Executed by Thread[vert.x-worker-thread-7,5,main]
Executed by Thread[vert.x-worker-thread-8,5,main]
Executed by Thread[vert.x-worker-thread-9,5,main]
Executed by Thread[vert.x-worker-thread-10,5,main]
Executed by Thread[vert.x-worker-thread-11,5,main]

The previous example clearly shows that the worker context of the verticle use different worker threads for delivering the messages:

However the same thread can be used by several worker verticles:

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(2));
vertx.deployVerticle(
    TheWorker.class.getName(),
    new DeploymentOptions().setWorker(true).setInstances(4)
);
for (int i = 0;i < 10;i++) {
  vertx.eventBus().send("the-address", "the-message", reply -> {
    System.out.println(reply.result().body());
  });
}

This prints:

Executed by worker 1 with Thread[vert.x-worker-thread-1,5,main]
Executed by worker 2 with Thread[vert.x-worker-thread-0,5,main]
Executed by worker 2 with Thread[vert.x-worker-thread-0,5,main]
Executed by worker 1 with Thread[vert.x-worker-thread-1,5,main]
Executed by worker 2 with Thread[vert.x-worker-thread-0,5,main]
Executed by worker 1 with Thread[vert.x-worker-thread-1,5,main]
Executed by worker 4 with Thread[vert.x-worker-thread-1,5,main]
Executed by worker 3 with Thread[vert.x-worker-thread-0,5,main]
Executed by worker 4 with Thread[vert.x-worker-thread-1,5,main]
Executed by worker 3 with Thread[vert.x-worker-thread-0,5,main]

The same worker verticle class can be deployed several times by specifying the number of instances. This allows to concurrently process blocking tasks:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(
    TheWorker.class.getName(),
    new DeploymentOptions().setWorker(true).setInstances(3)
);
for (int i = 0;i < 10;i++) {
  vertx.eventBus().send("the-address", "the-message", reply -> {
    System.out.println(reply.result().body());
  });
}

Workers can schedule timers:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    long now = System.currentTimeMillis();
    System.out.println("Starting timer on " + Thread.currentThread());
    vertx.setTimer(1000, id -> {
      System.out.println("Timer fired " + Thread.currentThread() + " after " + (System.currentTimeMillis() - now) + " ms");
    });
  }
}, new DeploymentOptions().setWorker(true));

This prints:

Starting timer on Thread[vert.x-worker-thread-0,5,main]
Timer fired Thread[vert.x-worker-thread-1,5,main] after 1007 ms

Again the timer thread is not the same than the thread that created the timer.

With a periodic timer:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    AtomicLong count = new AtomicLong(10);
    long now = System.currentTimeMillis();
    System.out.println("Starting periodic on " + Thread.currentThread());
    vertx.setPeriodic(1000, id -> {
      if (count.decrementAndGet() < 0) {
        vertx.cancelTimer(id);
      }
      System.out.println("Periodic fired " + Thread.currentThread() + " after " + (System.currentTimeMillis() - now) + " ms");
    });
  }
}, new DeploymentOptions().setWorker(true));

we get a different thread for each event:

Starting timer on Thread[vert.x-worker-thread-0,5,main]
Timer fired Thread[vert.x-worker-thread-1,5,main] after 1007 ms

Since the worker thread may block, the delivery cannot be guaranteed in time:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    AtomicLong count = new AtomicLong(10);
    long now = System.currentTimeMillis();
    System.out.println("Starting periodic on " + Thread.currentThread());
    vertx.setPeriodic(1000, id -> {
      if (count.decrementAndGet() < 0) {
        vertx.cancelTimer(id);
        System.exit(0);
      }
      System.out.println("Periodic fired " + Thread.currentThread() + " after " + (System.currentTimeMillis() - now) + " ms");
    });
  }
}, new DeploymentOptions().setWorker(true));

This prints:

Starting periodic on Thread[vert.x-worker-thread-0,5,main]
Periodic fired Thread[vert.x-worker-thread-1,5,main] after 1006 ms
Periodic fired Thread[vert.x-worker-thread-2,5,main] after 2006 ms
Periodic fired Thread[vert.x-worker-thread-3,5,main] after 3005 ms
Periodic fired Thread[vert.x-worker-thread-4,5,main] after 4007 ms
Periodic fired Thread[vert.x-worker-thread-5,5,main] after 5006 ms
Periodic fired Thread[vert.x-worker-thread-6,5,main] after 6007 ms
Periodic fired Thread[vert.x-worker-thread-7,5,main] after 7006 ms
Periodic fired Thread[vert.x-worker-thread-8,5,main] after 8005 ms
Periodic fired Thread[vert.x-worker-thread-9,5,main] after 9007 ms
Periodic fired Thread[vert.x-worker-thread-10,5,main] after 10004 ms

Just like event loop, the size of the worker thread pool can be configured when creatin a Vertx instance:

Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(10));

Multi-threaded worker context

Multi-threaded contexts are assigned to verticles deployed with the multi-threaded option enabled. Whereas standard worker contexts execute actions in order on a variety of threads, the multi-threaded worker context removes the strong ordering of events to allow the execution of multiple events concurrently. This means that the user is responsible for performing the appropriate concurrency control such as synchronization and locking.

This feature is deprecated since Vert.x 3.6 and will be removed in Vert.x 4. This feature is often confusing for the user and it is possible to achieve the same concurrency with an event-loop context and executeBlocking with ordered set to false.

Dealing with contexts

Using a context is usually transparent, Vert.x will manage contexts implicitly when deploying a Verticle, registering an Event Bus handler, etc…​ However the Vert.x API provides several ways to interact with a Context allowing for manual context switching.

The current context

The static Vertx.currentContext() methods returns the current context if there is one, it returns null otherwise.

Vertx vertx = Vertx.vertx();
System.out.println("Current context is " + Vertx.currentContext());

We get obviously null no matter the Vertx instance we created before:

Current context is null

Now the same from a verticle leads to obtaining the Verticle context:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
  public void start() throws Exception {
    System.out.println("Current context is " + Vertx.currentContext());
    System.out.println("Verticle context is " + context);
    System.exit(0);
  }
});

We get:

Current context is io.vertx.core.impl.EventLoopContext@71379d71
Verticle context is io.vertx.core.impl.EventLoopContext@71379d71

Creating or reusing a context

The vertx.getOrCreateContext() method returns the context associated with the current thread (like currentContext) otherwise it creates a new context, associates it to an event loop and returns it:

Vertx vertx = Vertx.vertx();
Context context = vertx.getOrCreateContext();
System.out.println("Current context is " + Vertx.currentContext());

Note, that creating a context, will not associate the current thread with this context. This will indeed not change the nature of the current thread! However we can now use this context for running an action:

Vertx vertx = Vertx.vertx();
Context context = vertx.getOrCreateContext();
context.runOnContext(v -> {
  System.out.println("Current context is " + Vertx.currentContext());
});

This prints:

Current context is io.vertx.core.impl.EventLoopContext@3fcd88a9

Calling getOrCreateContext from a verticle returns the context associated with the Verticle:

Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
  public void start() throws Exception {
    Context context = vertx.getOrCreateContext();
    System.out.println(context);
    System.out.println(vertx.getOrCreateContext());
  }
});

This prints:

io.vertx.core.impl.EventLoopContext@6a5235dc
io.vertx.core.impl.EventLoopContext@6a5235dc

Running on context

The io.vertx.core.Context.runOnContext(Handler) method can be used when the thread attached to the context needs to run a particular task on a context.

For instance, the context thread initiates a non Vert.x action, when this action ends it needs to do update some state and it needs to be done with the context thread to guarantee that the state will be visible by the context thread.

Context context = Vertx.currentContext();

System.out.println("Running with context : " + Vertx.currentContext());

// Our blocking action
Thread thread = new Thread() {
  public void run() {

    // No context here!
    System.out.println("Current context : " + Vertx.currentContext());

    int n = getNumberOfFiles();
    context.runOnContext(v -> {

      // Runs on the same context
      System.out.println("Runs on the original context : " + Vertx.currentContext());
      numberOfFiles = n;
    });
  }
};

//
thread.start();

This prints:

Running with context : io.vertx.core.impl.EventLoopContext@69cdd6d8
Current context : null
Runs on the original context : io.vertx.core.impl.EventLoopContext@69cdd6d8

The vertx.runOnContext(Handler<Void>) is a shortcut for what we have seen before: it calls the getOrCreateContext method and schedule a task for execution via the context.runOnContext(Handler<Void>) method.

Blocking

Before Vert.x 3, using blocking API required to deploy a worker Verticle. Vert.x 3 provides an additional API for using a blocking API:

vertx.runOnContext(v -> {

  // On the event loop
  System.out.println("Calling blocking block from " + Thread.currentThread());

  Handler<Future<String>> blockingCodeHandler = future -> {
    // Non event loop
    System.out.println("Computing with " + Thread.currentThread());
    future.complete("some result");
  };

  Handler<AsyncResult<String>> resultHandler = result -> {
    // Back to the event loop
    System.out.println("Got result in " + Thread.currentThread());
  };

  // Execute the blocking code handler and the associated result handler
  vertx.executeBlocking(blockingCodeHandler, resultHandler);
});

This prints:

Calling blocking block from Thread[vert.x-eventloop-thread-0,5,main]
Computing with Thread[vert.x-worker-thread-0,5,main]
Got result in Thread[vert.x-eventloop-thread-0,5,main]

While the blocking action executes with a worker thread, the result handler is executed with the same event loop context.

The blocking action is provided a Future argument that is used for signaling when the result is obtained, usually a result of the blocking API.

When the blocking action fails the result handler will get the failure as cause of the async result object:

Vertx vertx = Vertx.vertx();
vertx.runOnContext(v -> {

  Handler<Future<String>> blockingCodeHandler = future -> {
    throw new RuntimeException();
  };

  Handler<AsyncResult<String>> resultHandler = result -> {
    if (result.succeeded()) {
      System.out.println("Got result");
    } else {
      System.out.println("Blocking code failed");
      result.cause().printStackTrace(System.out);
    }
  };

  vertx.executeBlocking(blockingCodeHandler, resultHandler);
});

This prints:

Blocking code failed
java.lang.RuntimeException
at org.vietj.vertx.eventloop.ExecuteBlockingThrowingFailure.lambda$null$0(ExecuteBlockingThrowingFailure.java:19)
at org.vietj.vertx.eventloop.ExecuteBlockingThrowingFailure$$Lambda$4/163784093.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:217)
at io.vertx.core.impl.ContextImpl$$Lambda$6/1645685573.run(Unknown Source)
at io.vertx.core.impl.OrderedExecutorFactory$OrderedExecutor.lambda$new$180(OrderedExecutorFactory.java:91)
at io.vertx.core.impl.OrderedExecutorFactory$OrderedExecutor$$Lambda$2/1053782781.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The blocking action can also report the failure on the Future object:

Vertx vertx = Vertx.vertx();
vertx.runOnContext(v -> {

  Handler<Future<String>> blockingCodeHandler = future -> {
    try {
      throw new Exception();
    } catch (Exception e) {
      future.fail(e);
    }
  };

  Handler<AsyncResult<String>> resultHandler = result -> {
    if (result.succeeded()) {
      System.out.println("Got result");
    } else {
      System.out.println("Blocking code failed");
      result.cause().printStackTrace(System.out);
    }
  };

  vertx.executeBlocking(blockingCodeHandler, resultHandler);
});

Obviously executing a task from the blocking action on the context will use the event loop:

vertx.runOnContext(v -> {

  // On the event loop
  System.out.println("Calling blocking block from " + Thread.currentThread());

  Handler<Future<String>> blockingCodeHandler = future -> {
    // Non event loop
    System.out.println("Computing with " + Thread.currentThread());

    // Running on context from the worker
    vertx.runOnContext(v2 -> {
      System.out.println("Running on context from the worker " + Thread.currentThread());
    });
  };

  // Execute the blocking code handler and the associated result handler
  vertx.executeBlocking(blockingCodeHandler, result -> {});
});

Which outputs:

Calling blocking block from Thread[vert.x-eventloop-thread-0,5,main]
Computing with Thread[vert.x-worker-thread-0,5,main]
Running on context from the worker Thread[vert.x-eventloop-thread-0,5,main]

This API is somewhat similar to deploying a worker Verticle, although its purpose is to execute a single blocking operation from an event loop context.

while the executeBlocking is a Vertx method, the blocking actions are scheduled on the underlying context and serialized, i.e executed one after another and not in parallel.

Execute blocking for any particular verticle instance uses the same context as that instance.

By default, if you call executeBlocking multiple times in any particular instance they will be executed in the order you called them. If we didn’t do that you’d get into a mess, e.g. if you did an insertBlocking to insert some data into a table, followed by another to select from that table, then there’d be no guarantee in which order they occurred so you might not find your data.

When several blocking tasks are submitted, the current implementation picks an available worker for executing the first task, after its execution, it will execute any pending tasks. After the executions of all the tasks, the worker stops and goes back in the worker pool.

It is possible to execute also unordered blocks, i.e the blocks can be executed in parallel by setting the ordered argument to false:

vertx.runOnContext(v -> {

  // The blocks are executed on any available worker thread
  vertx.executeBlocking(blockingCodeHandler1, false, ar -> {});
  vertx.executeBlocking(blockingCodeHandler2, false, ar -> {});
  vertx.executeBlocking(blockingCodeHandler3, false, ar -> {});
});

Determining the kind of context

The kind of a context can be determined with the methods:

  • Context#isEventLoopContext

  • Context#isWorkerContext

  • Context#isMultiThreadedWorkerContext

the nature of the context does not guarantee the nature of the thread, indeed the executeBlocking method can execute a task with a worker thread in an event loop context

Determining the kind of thread

As said earlier, the nature of the context impacts the concurrency. The executeBlocking method can even change use a worker thread in an event loop context. The kind of context should be properly determined with the static methods:

  • Context#isOnEventLoopThread()

  • Context#isOnWorkerThread()

Concurrency

When the Vert.x API needs a context, it calls the vertx.getOrCreateContext() method, when the Vert.x API is used in a context, for instance when deploying a Verticle. This implies that any service created from this Verticle will reuse the same context, for instance:

  • Creating a server

  • Creating a client

  • Creating a timer

  • Registering an event bus handler

  • etc…​

Such services will call back the Verticle that created them at some point, how this happens is according to the context: the context remains the same, however its nature has a direct impact on the concurrency as it govers the threading model:

vertx.deployVerticle(new AbstractVerticle() {
  int value = 0;
  @Override
  public void start() throws Exception {
    vertx.setPeriodic(100, id -> value++);
    vertx.eventBus().consumer("the-address", msg -> {
      msg.reply(value);
    });
  }
});

Deployed as a worker, no exclusion is required, however the changes must be visibles between threads, pretty much like this:

vertx.deployVerticle(new AbstractVerticle() {
  volatile int value;
  @Override
  public void start() throws Exception {
    vertx.setPeriodic(100, id -> value++);
    vertx.eventBus().consumer("the-address", msg -> {
      msg.reply(value);
    });
  }
}, new DeploymentOptions().setWorker(true));

Embedding Vert.x

When Vert.x is embedded like in a main Java method or a junit test, the thread creating Vert.x can be any kind of thread, but it is certainly not a Vert.x thread. Any action that requires a context will implicitly create an event loop context for executing this action.

Vertx vertx = Vertx.vertx();
HttpServer server = vertx.createHttpServer();
server.listen(result -> {
  // This runs in a context created just for the purpose of this http server
});

When several actions are done, there will use different context and there are high chances they will use a different event loop thread.

Vertx vertx = Vertx.vertx();
vertx.createHttpServer().requestHandler(requestHandler).listen(result -> {
  // This executes in a context
  System.out.println("Current thread is " + Thread.currentThread());
  });
vertx.createHttpServer().requestHandler(requestHandler).listen(result -> {
  // This executes in a different context
  System.out.println("Current thread is " + Thread.currentThread());
  });

This prints:

Current thread is Thread[vert.x-eventloop-thread-1,5,main]
Current thread is Thread[vert.x-eventloop-thread-0,5,main]

Therefore accessing a shared state from both servers should not be done!

When the same context needs to be used then the actions can be grouped with a runOnContext call:

Vertx vertx = Vertx.vertx();
vertx.runOnContext(v -> {
  vertx.createHttpServer().requestHandler(requestHandler).listen(result -> {
    // This executes in a context
    System.out.println("Current thread is " + Thread.currentThread());
  });
  vertx.createHttpServer().requestHandler(requestHandler).listen(result -> {
    // This executes in the same context
    System.out.println("Current thread is " + Thread.currentThread());
  });
});

This prints:

Current thread is Thread[vert.x-eventloop-thread-0,5,main]
Current thread is Thread[vert.x-eventloop-thread-0,5,main]

Now we can share state between the two servers safely.

Integrating Netty

Netty is one of the dependencies of Vert.x. In fact, Netty powers the networking services of Vert.x. Vert.x Core provides the basic network services one can expect from such library:

  • TCP

  • HTTP

  • UDP

  • DNS

These are built with various components from Netty. The Netty community has implemented a wide range of components and this chapter explains how to integrate such components in Vert.x.

In this chapter we will build a TIME prococol client and server. The Netty documentation provides client/server implementations of this simple protocol, we will focus on the integration of these components.

Netty integration points

The main purpose of this chapter is to explain some of the Vert.x’s internal interfaces. Such interfaces are extensions that exposes low level methods to interact with Netty that are useful for components that re-use Netty directly.

Most users don’t need to deal with this extension and thus such methods are isolated in an extension interface.

Bootstrapping clients

ContextInternal extends io.vertx.core.Context and exposes various Netty integration points like VertxInternal.

Usually contexts are obtained from the Vertx#getOrCreateContext() method that returns the current execution context or create a new one if necessary: when called in a Verticle, getOrCreateContext() returns the context of this Verticle, when used in a non Vert.x thread like a main or a unit test, it creates a new one and returns it.

Context context = vertx.getOrCreateContext();

// Cast to access extra methods
Internals contextInternal = (Internals) context;

Contexts are always associated with a Netty event loop and thus using this context ensures our components re-use the same event loop if one existed before or use a new one.

The ContextInternal#nettyEventLoop() method returns this particular event loop and we can use it on Bootstrap (for client) or ServerBoostrap (for server):

ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();

Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(eventLoop);
1 get the event-loop associated with this context
2 create a bootstrap for the client

Bootstrapping servers

VertxInternal extends io.vertx.core.Vertx, among all VertxInternal#getAcceptorEventLoopGroup() returns an EventLoopGroup for accepting connections on a server, it’s typical usage is on a ServerBootstrap:

ContextInternal contextInt = (ContextInternal) context; (1)
EventLoop eventLoop = contextInt.nettyEventLoop();

VertxInternal vertxInt = contextInt.owner(); (2)
EventLoopGroup acceptorGroup = vertxInt.getAcceptorEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
1 get the event-loop associated with this context
2 get the acceptor event-loop group of Vertx
3 create the boostrap for the server

Handling events

Now that we are more intimate with ContextInternal, let’s look at how we can use it to handle Netty events such as network events, channel life cycle, etc…​

The ContextInternal#executeFromIO methods is used to dispatch events to the application as it ensures:

  • the context concurrency: reuse the current event-loop thread or execute on a worker

  • the thread local association of the current context with the dispatch thread

  • any uncaught exception thrown is reported on the context, such exception is either logged or passed to the Context#exceptionHandler

Here is a short example showing a server bootstrap

Handler<ChannelFuture> bindHandler = fut -> {
  if (fut.isSuccess()) {
    // Signal application with bind success
  } else {
    // Signal application with bind error
  }
};

(1)
bootstrap.bind(socketAddress).addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    (2)
    context.executeFromIO(future, bindHandler);
  }
});

Typical usage of executeFromIO:

  • listen handler

  • connect handler

  • request handler

  • close handler

  • etc…​

The server

The original server example can be found here.

The Vert.x TIME server exposes a simple API:

  • a static method to create a TimeServer

  • two methods: listen to bind a server and close to unbind

  • the requestHandler for setting an handler for handling requests

public interface TimeServer {

  /**
   * @return a new time server
   */
  static TimeServer create(Vertx vertx) {
    return new TimeServerImpl(vertx);
  }

  /**
   * Set the handler to be called when a time request happens. The handler should complete
   * the future with the time value.
   *
   * @param handler the handler to be called
   * @return this object
   */
  TimeServer requestHandler(Handler<Future<Long>> handler);

  /**
   * Start and bind the time server.
   *
   * @param port the server port
   * @param host the server host
   * @param listenHandler the listen result handler
   */
  void listen(int port, String host, Handler<AsyncResult<Void>> listenHandler);

  /**
   * Close the time server.
   */
  void close();

}

A TIME server serving the current JVM time is then straighforward to implement:

Vertx vertx = Vertx.vertx();

// Create the time server
TimeServer server = TimeServer.create(vertx);
server.requestHandler(time -> {
  time.complete(System.currentTimeMillis());
});

// Start the server
server.listen(8037, "0.0.0.0", ar -> {
  if (ar.succeeded()) {
    System.out.println("Server started");
  } else {
    ar.cause().printStackTrace();
  }
});

Let’s study now the server implementation.

The server bootstrap

First let’s have a look at the ServerBootstrap creation and configuration

EventLoopGroup acceptorGroup = vertx.getAcceptorEventLoopGroup(); (1)
EventLoop eventLoop = context.nettyEventLoop(); (2)
bootstrap = new ServerBootstrap(); (3)
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline(); (4)
    TimeServerHandler handler = new TimeServerHandler(context, requestHandler);
    pipeline.addLast(handler);
  }
});
1 VertxInternal returns the event loop group to use as acceptor group
2 ContextInternal returns the event loop to use as child group
3 create and configure the Netty’s ServerBootstrap
4 configure the channel with the TimeServerHandler initialized with the server requestHandler

The creation of the ServerBootstrap is quite straightforward and is very similar to the original version. The main difference is that we reuse the event-loop provided by the Verticle and Vert.x. This ensures that our server shares the same resources of our application.

Notice that the TimeServerHandler is initialized with the server requestHandler, this handler will be used when serving TIME requests.

The server bind

Now let’s have a look at the bind operation, again it’s very and does not differ much from the original example:

ChannelFuture bindFuture = bootstrap.bind(host, port);
bindFuture.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) {
    context.executeFromIO(v -> { (1)

      //
      if (future.isSuccess()) {
        channel = future.channel();
        listenHandler.handle(Future.succeededFuture(null));
      } else {
        listenHandler.handle(Future.failedFuture(future.cause()));
      }
    });
  }
});
1 use executeFromIO to dispatch to the application
2 call the listen handler either with a success or a failure

The most important part is that use executeFromIO to dispatch the result to the listenHandler in order to make the application aware of the bind result.

The server handler

Now let’s finish our server with the TimeServerHandler, which is an adaptation of the Netty’s original TimeServerHandler:

Future<Long> result = Future.future(); (1)

context.executeFromIO(result, requestHandler); (2)

result.setHandler(ar -> { (3)
  if (ar.succeeded()) {  (4)
    ByteBuf time = ctx.alloc().buffer(4);
    time.writeInt((int) (ar.result() / 1000L + 2208988800L));
    ChannelFuture f = ctx.writeAndFlush(time);
    f.addListener((ChannelFutureListener) channelFuture -> ctx.close());
  } else {  (5)
    ctx.close();
  }
});
1 create a new blank future that will be resolved by the requestHandler
2 let the context dispatch to the requestHandler with executeFromIO
3 the future handler is called when the requestHandler implementation completes the future
4 write the current TIME to the channel and close it after
5 the application failed we simply close the socket

Again here, executeFromIO is used when a TIME request event happens, the future to be completed is passed to the requestHandler. When this future is completed, the handler will either write the time result to the channel or close it.

The client

The original client example can be found here.

The Vert.x time client exposes a simple API:

  • a static method for creating a TimeClient

  • the client getTime method for retrieving a time value from a server

public interface TimeClient {

  /**
   * @return a new time client
   */
  static TimeClient create(Vertx vertx) {
    return new TimeClientImpl(vertx);
  }

  /**
   * Fetch the current time from a server.
   *
   * @param port the server port
   * @param host the server host name
   * @param resultHandler the asynchronous time result
   */
  void getTime(int port, String host, Handler<AsyncResult<Long>> resultHandler);

}

The TIME client is straightforward to use:

Vertx vertx = Vertx.vertx();

// Create the time client
TimeClient server = TimeClient.create(vertx);

// Fetch the time
server.getTime(8037, "localhost", ar -> {
  if (ar.succeeded()) {
    System.out.println("Time is " + new Date(ar.result()));
  } else {
    ar.cause().printStackTrace();
  }
});

Let’s study now the client implementation.

The client bootstrap

First let’s have a look at the Bootstrap creation and configuration

EventLoop eventLoop = context.nettyEventLoop();  (1)

// Create and configure the Netty bootstrap
Bootstrap bootstrap = new Bootstrap(); (2)
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline(); (4)
    pipeline.addLast(new TimeClientHandler(context, resultHandler));
  }
});

return bootstrap;
1 ContextInternal returns the event loop to use as child group
2 create and configure the Netty’s Bootstrap
3 configure the channel with the TimeServerHandler initialized with the server resultHandler

The creation of the Bootstrap is quite straightforward and is very similar to the original version. The main difference is that we reuse the event-loop provided by the Verticle. This ensures that our client reuses the same event-loop than our verticle.

Like in the server example we use the ContextInternal to obtain Netty’s EventLoop to set on the Bootstrap.

Notice that the TimeServerHandler is initialized with the client resultHandler, this handler will be called with the TIME request result.

The client connect

The bootstrap setup is very similar to the original example, in case of a failure the application callback uses again executeFromIO for the same reason thatn before.

The TimeClientHandler integration uses also executeFromIO for calling back the application:

ChannelFuture connectFuture = bootstrap.connect(host, port); (1)
connectFuture.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) { (2)
      context.executeFromIO(v -> { (3)
        resultHandler.handle(io.vertx.core.Future.failedFuture(future.cause()));
      });
    }
  }
});
1 connect to the server
2 upon connect error we call the result handler with a failure
3 use executeFromIO to dispatch the connect failure to the application

We only care of propagating a connect failure to the application, when the bootstrap connects successfully, the TimeServerHandler will handle the network response to the application.

When a connect failure happens, like in the server implementation, executeFromIO is used to dispatch the failure to the resultHandler in order to make the application aware of the connect error.

The client handler

Now let’s finish our client with the TimeServerHandler, which is an adaptation of the Netty’s original TimeClientHandler:

ByteBuf m = (ByteBuf) msg;
long currentTimeMillis;
try {
  currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; (1)
  context.executeFromIO(Future.succeededFuture(currentTimeMillis), resultHandler); (2)
  resultHandler = null; (3)
  ctx.close(); (4)
} finally {
  m.release();
}
1 decode the time response from the server
2 let the context dispatch the result to the resultHandler with executeFromIO
3 set the resultHandler to null
4 close the channel

Again here, executeFromIO is used when a TIME response event happens to dispatch the result to the resultHandler

Using Netty TCP codecs

In the previous section we have examined how Vert.x and Netty can share resources and the propagation of Netty events to a Vert.x application. In this section we will study the integration of existing Netty codecs.

Netty codecs are great for encapsulating and reusing network protocols encoder and decoder. The base Netty distribution provides a few codecs for popular protocols such as HTTP, Redis, Memcached or MQTT.

Client and server can be built on top of these codecs with Vert.x, e.g the Vert.x HTTP components reuses Netty’s HTTP codec, akin for Vert.x the MQTT protocols.

Vert.x TCP client/server can be customized to reuse Netty codecs. In fact the channel of a NetSocket can be used to customize the pipeline and read/write arbitrary messages.

There is a lot of value in reusing NetSocket this way

  • extend the Vert.x ecosystem, your clients/servers will be fully integrated with this ecosystem, i.e you can mix and match your middleware with existing Vert.x middleware, filesystem, etc…​

  • build on top of NetSocket features

    • SSL/TLS

    • Domain sockets

    • Client Socks/HTTP proxy handling

    • Server verticle scaling

    • Metrics

    • SNI handling

In this chapter we will write a client, but the same techniques can be applied for writing a server on top of Netty’s codec the same way.

everything achieved in this chapter can be also achieved using the techniques shown in the Integrating Netty chapter

The Memcached client

As example we will build in this chapter a simple Memcached client on top of Netty’s Memcached binary codec.

Memcached is a popular free and open source, high-performance, distributed memory object caching system.

There are two versions of the protocol, text and binary. In this section we will build a client for the binary protocol described in this document.

The client is very straightforward to use:

Vertx vertx = Vertx.vertx();

// Create the memcached client
MemcachedClient.connect(vertx, 11211, "localhost", ar1 -> {
  if (ar1.succeeded()) {

    // Connected to memcached
    System.out.println("connected");

    // Get the client
    MemcachedClient client = ar1.result();

    // Put a value
    client.set("foo", "bar", ar2 -> {
      if (ar2.succeeded()) {

        System.out.println("Put successful");

        // Now retrieve the same value
        client.get("foo", ar3 -> {
          if (ar3.succeeded()) {
            String res = ar3.result();
            System.out.println("Get successful " + res + "");
          } else {
            ar3.cause().printStackTrace();
          }
        });
      } else {
        ar2.cause().printStackTrace();
      }
    });
  } else {
    ar1.cause().printStackTrace();
  }
});

You can easily start a Memcached server with Docker to try this example:

> docker run --rm --name my-memcache -p 11211:11211 -d memcached

Anatomy of the Memcached client

The client provides a simple API for connecting to a Memcached server and get/set entries.

public interface MemcachedClient {

  /**
   * Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
   */
  static void connect(Vertx vertx, int port, String host, Handler<AsyncResult<MemcachedClient>> completionHandler) {
    MemcachedClientImpl.connect(vertx, port, host, null, completionHandler);
  }

  /**
   * Connect to memcached, the {@code completionHandler} will get the {@link MemcachedClient} instance.
   */
  static void connect(Vertx vertx, int port, String host, NetClientOptions options, Handler<AsyncResult<MemcachedClient>> completionHandler) {
    MemcachedClientImpl.connect(vertx, port, host, options, completionHandler);
  }

  /**
   * Get a cached entry.
   *
   * @param key the entry key
   * @param completionHandler the handler called with the result
   */
  void get(String key, Handler<AsyncResult<@Nullable String>> completionHandler);

  /**
   * Set a cached entry.
   *
   * @param key the entry key
   * @param value the entry value
   * @param completionHandler the handler called with the result
   */
  void set(String key, String value, Handler<AsyncResult<Void>> completionHandler);

}

The Memcached codec

The Memcached codec provided by Netty takes care of encoding and decoding Netty ByteBuf from and to Memcached request and response.

Our client will only require to use Memcached objects:

  • write FullBinaryMemcacheRequest to the pipeline

    • has a key property: a ByteBuf to provide the cached entry key

    • has a opCode property: an enum indicating the operation, GET and SET

    • has a extras property: a Bytebuf to provide extra information, only used in Memcached SET requests

    • has a content property: a Bytebuf to provide the cached entry value, only used in Memcached SET requests

  • read FullBinaryMemcacheResponse from the pipeline

    • has a status property: a value equals to 0 when the operation went successful

    • has a content property: a Bytebuf to provide the cached entry value, only used in Memcached GET responses

Memcached provides a richer protocol than GET or SET, but we don’t cover it in this section, as the goal is just to be a demonstration and not a complete client

Connecting to the server

Let’s look first at the client connect implementation:

NetClient client = options != null ? vertx.createNetClient(options) : vertx.createNetClient();

// Connect to the memcached instance
client.connect(port, host, ar -> {
  if (ar.succeeded()) {
    // Get the socket
    NetSocketInternal so = (NetSocketInternal) ar.result();

    // Create the client
    MemcachedClientImpl memcachedClient = new MemcachedClientImpl(so);

    // Initialize the client: configure the pipeline and set the handlers
    memcachedClient.init();

    // Return the memcached instance to the client
    completionHandler.handle(Future.succeededFuture(memcachedClient));
  } else {
    completionHandler.handle(Future.failedFuture(ar.cause()));
  }
});

The connect implementation creates a Vert.x NetClient to connect to the actual Memcached server. When the connect is a success

  • the Vert.x NetSocket is casted to NetSocketInternal

  • the Memcached client is created and initialized

The NetSocketInternal is an advanced interface that gives access to a few extra methods that we need to build the client:

  • channelHandlerContext() returns the context of the NetSocket Netty’s handler

  • writeMessage(Object, Handler<AsyncResult<Void>>) writes an object to the pipeline

  • messsageHandler(Handler<Object>) sets and handler for processing pipeline messages

The Memcached client init method uses some of them to

  • initialize the NetSocket with the Memcached codec

  • sets a message handler to process the Memcached responses

ChannelPipeline pipeline = so.channelHandlerContext().pipeline();

// Add the memcached message aggregator
pipeline.addFirst("aggregator", new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));

// Add the memcached decoder
pipeline.addFirst("memcached", new BinaryMemcacheClientCodec());

// Set the message handler to process memcached message
so.messageHandler(this::processResponse);

Request / response correlation

The Memcached protocol is a pipelined protocol, the responses are received in the same order than the requests are sent.

Therefore the client needs to maintain an inflight FIFO queue which is a simple Java ConcurrentLinkedQueue. When a request is sent to the Memcached server, the response handler is added to the queue. When the response is received the handler is dequeued and can process the response.

Sending Memcached request messages

The client has a writeRequest method that sends a request to the pipeline:

  • write the request message

  • when the write is successful, add the response handler to the inflight queue so the responses can be processed

so.writeMessage(request, ar -> {
  if (ar.succeeded()) {
    // The message has been encoded succesfully and sent
    // we add the handler to the inflight queue
    inflight.add(completionHandler);
  } else {
    // The message could not be encoded or sent
    // we signal an error
    completionHandler.handle(Future.failedFuture(ar.cause()));
  }
});

Processing Memcached response messages

The client has a processResponse method that is called each time the Memcached codec decodes a response:

  • dequeue the response handler

  • release the Netty message since the response messages are pooled, this method must be called otherwise a memory leak will happen

FullBinaryMemcacheResponse response = (FullBinaryMemcacheResponse) msg;

try {
  // Get the handler that will process the response
  Handler<AsyncResult<FullBinaryMemcacheResponse>> handler = inflight.poll();

  // Handle the message
  handler.handle(Future.succeededFuture(response));
} finally {
  // Release the referenced counted message
  response.release();
}

Sending Memcached GET requests

Memcached GET is fairly straightforward

  • create a FullBinaryMemcacheRequest

    • set the key property

    • set the opCode property to BinaryMemcacheOpcodes.GET

  • call writeRequest passing the request and providing the response handler

ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);

// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);

// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);

// Execute the request
writeRequest(request, ar -> {
  if (ar.succeeded()) {
    // Get the response
    processGetResponse(ar.result(), completionHandler);
  } else {
    // Network error
    completionHandler.handle(Future.failedFuture(ar.cause()));
  }
});

Processing Memcached GET responses

Memcached GET responses are processed by processGetResponse

short status = response.status();
switch (status) {

  case 0:
    // Succesfull get
    String value = response.content().toString(StandardCharsets.UTF_8);
    completionHandler.handle(Future.succeededFuture(value));
    break;

  case 1:
    // Empty response -> null
    completionHandler.handle(Future.succeededFuture());
    break;

  default:
    // Memcached error
    completionHandler.handle(Future.failedFuture(new MemcachedError(status)));
    break;
}

The status property of the response indicates whether the response is successful or not. We need to pay special attention when the status is 1 as the client handles it as a Java null value.

Sending Memcached SET requests

Memcached SET is straightforward too

  • create a FullBinaryMemcacheRequest

    • set the key property

    • set the opCode property to BinaryMemcacheOpcodes.SET

    • set the extras property to a the value 0xDEADBEEF_00001C20

      • 0xDEADBEEF must be used per the protocol

      • 00001C20 is the expiration time set to 2 hours

    • set the value property

  • call writeRequest passing the request and providing the response handler

ByteBuf keyBuf = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8);

// Create the memcached request
FullBinaryMemcacheRequest request = new DefaultFullBinaryMemcacheRequest(keyBuf, Unpooled.EMPTY_BUFFER);

// Set the memcached operation opcode to perform a GET
request.setOpcode(BinaryMemcacheOpcodes.GET);

// Execute the request
writeRequest(request, ar -> {
  if (ar.succeeded()) {
    // Get the response
    processGetResponse(ar.result(), completionHandler);
  } else {
    // Network error
    completionHandler.handle(Future.failedFuture(ar.cause()));
  }
});

Processing Memcached SET responses

Memcached SET responses are processed by processSetResponse

short status = response.status();
if (status == 0) {
  // Succesfull get
  completionHandler.handle(Future.succeededFuture());
} else {
  // Memcached error
  completionHandler.handle(Future.failedFuture(new MemcachedError(status)));
}

Close hooks

Close hooks is an internal feature of Vert.x useful for creating components that are notified when a Verticle or a Vertx instance is closed. It can be used for implementing automatic clean-up in verticles feature, like for a Vert.x http server.

The contract for receving a close notification is defined by the io.vertx.core.Closeable interface and its close(Handler<AsyncResult<Void>> completionHandler) method:

public class ContextCloseHook implements Closeable {

  private final Context context;

  public ContextCloseHook(Vertx vertx) {
    this.context = vertx.getOrCreateContext();

    // Get notified when this context closes
    context.addCloseHook(this);
  }

  @Override
  public void close(Handler<AsyncResult<Void>> completionHandler) {
    // Create a new future
    Future<Void> fut = Future.future();

    // Set the close handler to be notified when the future resolves
    fut.setHandler(completionHandler);

    // Do cleanup, the method will complete the future
    doClose(fut);
  }

  /**
   * API close method - this is called by the user
   */
  public void close() {

    // Remove the hook
    context.removeCloseHook(this);

    // Do cleanup, the method will complete the future
    doClose(Future.future());
  }

  private void doClose(Future<Void> fut) {

    // No-op, in reality it would be a resource like a Netty channel, a file, etc...
    fut.complete();
  }
}

A Closeable instance can be registered to receive a close hook with the method The method Context#addCloseHook registers a Closeable instance to be notified when the context closes:

context.addCloseHook(closeable);

This is only valid for Verticles, when a Verticle is undeployed, its associated context is closed and the shutdown hooks are processed.

The component should still expose a close in its API to allow the user to close explicitly the resource, this method should call Context#removeCloseHook to remove the hook.

context.removeCloseHook(closeable);

Likewise VertxInternal provides the same operation to receive notifications when a Vertx instance is closed.