- DONT THREAD ON ME
- Oct 1, 2002
-
by Nyc_Tattoo
-
Floss Finder
|
typed this up for some reason:
For the last week or so I've been working on an elastic search client (really just a wrapper for the java library) in Scala.
The main motivation is that the most popular Scala Elastic Search library returns a scala.concurrent.Future, but our code
uses twitter.util.Future, and we have to convert between the two. Which isn't the end of the world, but it's annoying
and requires adding work to a thread pool when it's really not necessary. Also I really dislike the fancy dsl exposed by
the popular library (elastic4s); the official ES library is much easier to use and I'd rather just use that, even though
it relies on mutable builders which are not functional.
The only problem with using the java library directly is that it returns an ActionListener (or ResponseListener if you're using the low
level client) object for all of its asynchronous calls -- they're basically just promise wrappers around nio's async http
client. So we wanted a wrapper that accepts the standard Elastic Search query builders but returns a Twitter Future, rather
than an action listener/response listener.
Here's the (simplified) signature of the ES Java async Http API:
code:public void performRequestAsync(String method, String endpoint, String body, ResponseListener responseListener)
(The actual interface is sane and takes an HttpEntity instead of a string parameter called body, but I'm simplifying things).
Wrapping this in a Twitter Future is not difficult:
code:class ElasticClient(javaClient: JavaElasticSearchClient) {
def performRequestAsync(method: String, endpoint: String, body: Option[String]): TwitterFuture[Response] = {
val promise = new TwitterPromise[Response]()
val listener = new ResponseListener {
override def onFailure(e: Exception): Unit = promise.setException(e)
override def onSuccess(response: Response): Unit = promise.setValue(response)
}
body match {
case Some(b) => javaClient.performRequestAsync(method, endpoint, b, listener)
case None => javaClient.performRequestAsync(method, endpoint, listener)
}
promise // Promise <: Future, which is why we can return it here.
}
}
You can see how we simply register callbacks on the listener object that in turn sets the value or exception
within the promise.
However, we're type-safe functional programmers and we'd like to provide a type-safe interface that prevents us
from making mistakes such as passing a `GET` with a body. Let's define a few classes to encapsulate the standard requests.
code:case class Request(method: String, endpoint: String, body: Option[String])
case class Get(endpoint: String)
case class Post(endpoint: String, body: String)
case class Delete(endpoint: String)
// etc...
Now we could change performRequestAsync so that it accepts a request, and then overload:
code: def performRequestAsync(javaClient: JavaElasticSearchClient, request: Request): Future[Response] {
// the same as above
}
def performRequestAsync(javaClient: JavaElasticSearchClient, request: Get): Future[Response] {
performRequestAsync(Request("GET", request.endpoint))
}
However, overloading is considered poor scala form for reasons that aren't very exciting. We can use typeclasses instead:
code:trait AsyncRequest[Req] {
def performRequestAsync[Req]: Future[Response]
}
object AsyncRequest {
implicit object RequestAsyncRequest extends AsyncRequest[Request] {
def performRequestAsync(javaClient: JavaElasticSearchClient, request: Request): Future[Response] = {
val promise = new TwitterPromise[Response]()
val listener = new ResponseListener {
override def onFailure(e: Exception): Unit = promise.setException(e)
override def onSuccess(response: Response): Unit = promise.setValue(response)
}
}
body match {
case Some(b) => javaClient.performRequestAsync(method, endpoint, b, listener)
case None => javaClient.performRequestAsync(method, endpoint, listener)
}
promise
}
}
implicit object GetAsyncRequest extends AsyncRequest[Get] {
def performRequestAsync(javaClient: JavaElasticSearchClient, request: Get): Future[Response] = {
val promise = new TwitterPromise[Response]()
val listener = new ResponseListener {
override def onFailure(e: Exception): Unit = promise.setException(e)
override def onSuccess(response: Response): Unit = promise.setValue(response)
}
}
javaClient.performRequestAsync("GET", request.endpoint. listener)
promise
}
}
}
// in ElasticClient
def performRequestAsync[R](req: R)(implicit async: AsyncRequest[R]): Future[Response] = {
async.performRequestAsync(req)
}
Now we can do:
code:> val get = Get("/foo/foo1")
> val request = Request("GET", "/foo/foo1", None)
> client.performRequestAsync(request)(RequestAsyncRequest)
// or, implicitly:
> client.performRequestAsync(request) // since `RequestAsyncRequest` is in scope and satisfies AsyncRequest[Request]
> client.performRequestAsync(get)(GetAsyncRequest)
// or, implicitly:
> client.performRequestAsync(get) // since `GetAsyncRequest` is in scope and satisfies AsyncRequest[Get]
With the typeclass pattern, we can provide ad-hoc extensions to a type. Neat.
Ok, but the typeclass definitions above are pretty verbose. How can we make that easier? You might be thinking "add some
simple helper methods" or "extend from request" and those answers would be fine but instead let's do it this dumb
functional way:
You may have noticed that all of our request types can be defined in terms of `Request`, which is the most general of our
request types. Essentially, if we have a way to map from any of our request types to a `Request`, we know how to perform
that request, and don't need to add any more logic. How do we map from one type to another? How about with map!
Here, we've defined a method `map` on the `AsyncRequest[Req]` typeclasses that takes an existing `AsyncRequest` typeclass and
constructs a new one. We simply pass a function `A => Req` that takes an A and returns a Req.
code:trait AsyncRequest[Req] { self =>
def performRequestAsync[Req]: Future[Response]
def map[A](f: A => Req): AsnycRequest[A] = new AsyncRequest {
def performRequestAsync[A](req: A): Future[Response] = {
self.performRequestAsync(f(req))
}
}
}
object AsyncRequest {
implicit object RequestAsyncRequest extends AsyncRequest[Request] {
// same as above
}
// provide a typeclass AsyncRequest[Get] by mapping over AsyncRequest[Request]!
implicit val getAsyncRequest: AsyncRequest[Get] = RequestAsyncRequest.map[Get](get => Request("GET", get.endpoint))
// we can chain this for even higher level apis!!
case class EntityRequest(name: String, id: Int)
implicit val entityAsyncRequest: AsyncRequest[EntityRequest] = getAsyncRequest.map[EntityRequest](entity => Get(s"/${entity.name}/${entity.id}"))
}
Concretely, assume we've defined an `AsyncRequest[Request]` typeclass instance (as we have). By calling `AsyncRequest[Request].map[Get]` we're constructing
an instance of `AsyncRequest[Get]` in terms of `AsyncRequest[Request]`.
Now we can do:
code:> val entity = EntityRequest("foo", 12)
> client.performRequestAsync(entity)
Due to the add hoc nature of typeclasses, we can provide new types for `client.performRequestAsync` whenever we want, even
in the repl:
code:> case class FooRequest(id: Int)
> implicit val fooRequest = implicitly[AsyncRequest[EntityRequest]].map[FooRequest](foo => EntityRequest("foo", foo.id))
> val foo = FooRequest(12)
> client.performRequestAsync(foo)
Which is cool. As you can see, we're able to define a kind of limited class hierarchy in terms of map.
So, if the plan is to build higher level APIs, we probably want a way to manipulate the response as well as the request.
That's a simple extension of what we've already done, parameterizing over the Response:
code:trait AsyncRequest[Req, Rep] { self =>
def performRequestAsync[Req, Rep]: Future[Rep]
def map[A,B](f: A => Req, g: Req => B): AsyncRequest[A,B] {
def perfomRequestAsync[A,B](req: A): Future[Rep] = {
self.preformRequestAsync(f(req)).map(g)
}
}
}
object AsyncRequest {
implicit object RequestAsyncRequest extends AsyncRequest[Request, Response] {
// same as above.
}
// assuming some response class GetResponse, provide a typeclass AsyncRequest[Get, GetResponse] by mapping over
// AsyncRequest[Request, Response]!
implicit val getAsyncRequest: AsyncRequest[Get,GetResponse] = RequestAsyncRequest.map[Get, GetResponse](
get => Request("GET", get.endpoint),
rep => GetResponse(rep.field1, ...) // do stuff to build a get response
)
}
// in ElasticClient
def performRequestAsync[Req,Rep](request: Req)(implicit async: AsyncRequest[Req,Rep]): Future[Rep] = {
async.performRequestAsync(req)
}
This is pretty cool, we can implement all of the HTTP methods in a snap this way (I'm not going to).
Now we have an amazing wrapper for elastic search, and we want people to use it. But we have the same problem
elastic4s has: We've constrained ourselves to a concrete implementation of Future, and all of our users are stuck
using our Future type. If only there were some way to abstract over the concept of a Future, so that our interface
can return an arbitrary Future implementation. Let's do it.
First, let's rewrite our code so that it accepts an AsyncExecutor typeclass, which is responsible for converting
our response listener into a future:
code:trait AsyncExecutor {
def fromListener(f: ResponseListener => Unit): TwitterFuture[Response]
}
object TwitterFutureExecutor {
implicit object TwitterFutureAsyncExecutor extends AsyncExecutor {
def fromListener(f: ResponseListener => Unit): TwitterFuture[Response] = {
val promise = new TwitterPromise[Response]()
f(new ResponseListener {
override def onFailure(e: Exception): Unit = promise.setException(e)
override def onSuccess(response: Response): Unit = promise.setValue(response)
}
})
promise
}
}
}
// Rewrite our AsyncRequest typeclass in terms of the AsyncExecutor typeclass:
trait AsyncRequest[Req,Rep] {
def performRequestAsync(request: Req)(implicit executor: AsyncExecutor): Future[Rep]
// map is unchanged
}
implicit object Request extends AsyncRequest[Request, Response] {
override def performRequestAsync(client: JavaElasticSearchClient,
req: Request)(implicit executor: AsyncExecutor): Future[Response] = {
val async = req.body match {
case Some(body) =>
client.performRequestAsync(
req.method,
req.endpoint,
_: ResponseListener,
)
case None =>
client.performRequestAsync(req.method, req.endpoint, _: ResponseListener)
}
executor.fromListener(async)
}
Here, we partially apply the `client.performRequestAsync` method and turn it into a function that takes a ResponseListener
and returns a unit (`ResponseListener => Unit`). We pass this partially applied function into the `fromResponse` method
defined on the `AsyncExecutor` typeclass instance, which returns a `Future[Response]`. However, we haven't achieved our
goal: We want to be able to define this for any Future (or really, anything that can do something meaningful with a
ResponseListener). While our rewrite allows us to provide custom implementations of our `AsyncExecutor` typeclass, the
signature of `fromResponse` is still constrained to a concrete implementation of Future.
Let's take another look at our `AsyncExecutor` trait:
code:trait AsyncExecutor {
def fromListener(f: ResponseListener => Unit): TwitterFuture[Response]
}
What we want is parameterize TwitterFuture with a generic, let's try it:
code:trait AsyncExecutor[F] {
def fromListener(f: ResponseListener => Unit): F
}
This doesn't make sense: the return value of `fromResponse` was `Future[Response]`, substituting `A` for `Future` means that
the return value of `fromResponse` is simply Future! However, if we try to change it so that it returns a `Future[Response]`
we cannot, because the trait says that we need to return an `A`, not an `A[Response]`. Is there a way to change the
return value of our trait to `A[Response]`? In fact there is!
code:trait AsyncExecutor[F[_]] {
def fromListener(f: ResponseListener => Unit): F[Response]
}
Now we're using higher kinded types.
For our purposes, a higher kinded type is a type that takes a type parameter, such as an Option[A], a List[A], or, indeed,
a Future[A]. However, I'm going to do a bad job explaining this, so read up here:
https://typelevel.github.io/blog/2016/08/21/hkts-moving-forward.html
In order to make this useful, we need to constrain it a bit by saying "any type which can do something
meaningful with a ResponseListener." Let's see what a concrete implementation looks like:
code:object TwitterFutureExecutor {
implicit object TwitterFutureAsyncExecutor extends AsyncExecutor[TwitterFuture] {
def fromListener(f: ResponseListener => Unit): TwitterFuture[Response] = {
val promise = new TwitterPromise[Response]()
f(new ResponseListener {
override def onFailure(e: Exception): Unit = promise.setException(e)
override def onSuccess(response: Response): Unit = promise.setValue(response)
}
})
promise
}
}
}
// Rewrite our AsyncRequest typeclass in terms of the AsyncExecutor typeclass:
trait AsyncRequest[Req,Rep] { self =>
def performRequestAsync[F[_]](request: Req)(implicit executor: AsyncExecutor[F]): F[Rep]
}
implicit object Request extends AsyncRequest[RawMethods.Request, Response] {
override def performRequestAsync[F[_]](client: JavaElasticSearchClient,
req: Request)(implicit executor: AsyncExecutor[F]): F[Response] = {
val async = req.body match {
case Some(body) =>
client.performRequestAsync(
req.method,
req.endpoint,
_: ResponseListener,
)
case None =>
client.performRequestAsync(req.method, req.endpoint, _: ResponseListener)
}
executor.fromListener(async)
}
Now our performRequestAsync method is parameterized over some higher-kinded type that has an instance of the AsyncExecutor
typeclass available. For fun, let's provide an implementation of our AsyncRequest typeclass for the cats IO monad:
code:implicit object IOExecutor extends AsyncExecutor[IO] {
def fromListener(f: ResponseListener => Unit): IO[Response] = {
IO async { cb =>
f(new ResponseListener {
override def onFailure(e: Exception): Unit = cb(Left(e))
override def onSuccess(response: Response): Unit = cb(Right(response))
})
}
}
}
Now, depending on which executor is in scope, our `client.performRequestAsync[Req, Rep, F[_]: AsyncExecutor]` method will
return either an `IO[Rep]` or a `Future[Rep]`. Wow!
However, there's a problem:
code:trait AsyncRequest[Req,Rep] { self =>
def performRequestAsync[F[_]](request: Req)(implicit executor: AsyncExecutor[F]): F[Rep]
def map[A,B](f: A => Req, g: Req => B): AsyncRequest[A,B] {
def perfomRequestAsync[F[_]](req: A)(implicit async: AsyncExecutor[F]): F[Rep] = {
self.preformRequestAsync(f(req)).map(g) // wait, what?
}
}
}
`self.performRequestAsync(f(req))` returns an `F[Req]`, how do we map over `F[_]`? We can't, unless we constrain
our `F[_]` to things that can be mapped over. Things...that can be mapped over. That sounds familiar.. What do we call
things that we can map over? I'm pretty sure I've read that Functors are things that can be mapped over. Let's try that.
code:trait AsyncRequest[Req, Rep] { self =>
def performRequestAsync[F[_]](request: Req)(implicit executor: AsyncExecutor[F], functor: Functor[F]): F[Rep]
def map[A,B](f: A => Req, g: Req => B): AsyncRequest[A,B] {
def performRequestAsync[F[_]](req: A)(implicit async: AsyncExecutor[F], functor: Functor[F]): F[B] = {
functor.map(self.performRequestAsync(f(req)), g)
}
}
}
Wow!!!!!!!!!!! Functional programming is great! Now our client looks like this:
code: def performRequestAsync[Req, Rep, F[_]](req: Req)(
implicit
async: AsyncRequest[Req, Rep],
executor: AsyncExecutor[F]
functor: Functor[F]
): F[Rep] = {
async.performRequestAsync(javaClient, req)
}
and that's how you massively overengineer a rest client.
DONT THREAD ON ME fucked around with this message at 15:25 on Dec 12, 2017
|