Thursday, September 24, 2009

Develop a Memcached Client (3) - Distributed Hash Table

This is continuation of the network development in PLT post. You can see the previous posts for more details:
  1. network development overview
  2. memcached client - storage API
  3. memcached client - retrieval and deletion API, and the rest
At this point we have a functional memcached client, but since people generally use memcached like a distributed hash table, what we have is not yet sufficient - we need a frontend to multiple instances of memcached.

The idea is simple - have a structure holding multiple memcached clients:

(define-struct memcached/dht-client (clients)) ;; clients is a vector of memcached clients.
Then we need to ensure that the key gets consistently mapped to the client that holds the data, and ensure the keys are distributed uniformly.

Uniform Hash Distribution

While the concept is pretty straight forward, the algorithm behind hashing is non-trivial. A good string hash function that I found is djb2, with the below as the scheme implementation:

(define (djb2-hash key)
  (define (convert key)
    (cond ((string? key) (convert (string->bytes/utf-8 key)))
          ((bytes? key) (bytes->list key))
          ((symbol? key) (convert (symbol->string key)))))
  (define (helper bytes hash)
    (cond ((null? bytes) hash)
          (else
           (helper (cdr bytes) (+ (* hash 33) (car bytes))))))
  (helper (convert key) 5381))
Once we convert the key into a hash code, we just need to take the remainder against the number of available memcached instances:

;; get the hash code 
(define (memcached/dht-hash client key)
  (remainder (djb2-hash key) (vector-length (memcached/dhs-client-clients client))))
As long as the count and the order of the clients remain the same, we will hash the key to the same client:

;; return the particular client by the key 
(define (memcached/dht-target client key)
  (vector-ref (memcached/dht-client-clients client)
              (memcached/dht-hash client key)))
With them, we can now wrap around all of our API's - we basically maintain the same interface as the single instance API, except that we are passing in the dhs client:


(define (memcached/dht-set! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-set! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-add! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-add! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-replace! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-replace! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-append! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-append! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-prepend! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-prepend! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-cas! client key value cas #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-cas! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-delete! client key (delay 0) (noreply? #f))
  (memcached-delete! (memcached/dht-target client key) key delay noreply?))

(define (memcached/dht-incr! client key val (noreply? #f))
  (memcached-incr! (memcached/dht-target client key) key val noreply?))

(define (memcached/dht-decr! client key val (noreply? #f))
  (memcached-decr! (memcached/dht-target client key) key val noreply?))

All the setters are straight forward - hash the key to the underlying memcached client, and then call the corresponding single-instance methods on the parameters.

The getters are a bit trickier - since the getters can take in multiple keys as the parameter, and each of the keys might hash to a different underlying client, we should try to group the keys by their hash, and the pass the keys to the correct target, and finally merge the responses:

(define (group alist)
  ;; for each alist with the same key - group them together!!
  (foldl (lambda (kv interim)
           (if-it (assoc (car kv) interim) ;; the key already exists...
               (cons (cons (car it) (cons (cdr kv) (cdr it)))
                     (filter (lambda (kv)
                               (not (equal? it kv))) interim))
               (cons (list (car kv) (cdr kv)) interim)))
         '()
         alist))

(define (memcached/dht-retrieve* type client keys)
  (apply append (map (lambda (kv)
                       (apply type (car kv) (cdr kv)))
                     (group (map cons  
                                 (map (lambda (key)
                                        (memcached/dht-target client key))
                                      keys)
                                 keys)))))

(define (memcached/dht-get client key . keys)
  (memcached/dht-retrieve* memcached-get client (cons key keys)))

(define (memcached/dht-gets client key . keys)
  (memcached/dht-retrieve* memcached-gets client (cons key keys)))

And finally - memcached/dht-flush-all! does not require hashing; it simply calls the underlying flush-all command for each of the client. But according to the memcached's spec, it is best to stagger the flushing of the instances, so the database will not all of the sudden be overloaded. This pattern is directly built into the memcached/dht-flush-all!, and you can control the staggering by passing in an interval value that determines the gaps between the flushes.

(define (memcached/dht-flush-all! client (interval 10) (noreply? #f))
  (for-each (lambda (client i)
              (memcached-flush-all! client (* interval i) noreply?))
            (vector->list (memcached/dht-client-clients client))
            (build-list (vector-length (memcached/dht-client-clients client)) add1)))
We are almost there - our last stop is to integrate with DBI. Stay tuned.

No comments:

Post a Comment