Thursday, September 24, 2009

Develop a Memcached Client (4) - BZLIB/DBD-MEMCACHED

This is the "dramatic" conclusion of the memcached client development. You can find additional details in the previous posts:
  1. network development overview
  2. memcached client - storage API
  3. memcached client - retrieval and deletion API, and the rest
  4. distributed hash table frontend for memcached

Allright - it's not that dramatic, but I've made the memcached client available as a DBI driver via planet, so you can use it with the DBI interface.

The nicest thing about DBI is the ease for extension. You can - say - create your own driver to wrap around the memcached driver and the postgresql driver, so you do not have to sprinkle your code with calls to both drivers everywhere.

You'll find the tutorial of using bzlib/dbd-memcached after the integration section.

Integrate with DBI

The last stop is to integrate both the single instance and the distributed hashtable instance into DBI, so we can use it with the DBI interface.

As you remember from the create a DBI driver series, we basically have to create the following functions and register them as a driver with DBI:
  • connect - wrap around the creation of the connection
  • disconnect - disconnect the underlying connection
  • prepare - for memcached it's a NOOP
  • query - the majority of work is here
  • begin-trans (optional) - for memcached it's a NOOP
  • commit (optional) - for memcached it's a NOOP
  • rollback (optional) - for memcached it's a NOOP
Below is the code that registers the single instance memcached client - integration of memcached/dht is left as an exercise:

(define (m-connect driver host port)
  (make-handle driver (memcached-connect host port) (make-immutable-hash-registry) 0))

(define (m-disconnect handle)
  (memcached-disconnect (handle-conn handle)))

(define (m-prepare handle stmt)
  (void)) 

(define (make-query set! add! replace! append! prepend! cas! get gets delete! incr! decr! flush-all!)
  (lambda (handle stmt (args '()))
    (let ((client (handle-conn handle)))
      (case stmt 
        ((set! add! replace! append! prepend!)
         (let/assert! ((key (assoc/cdr 'key args))
                       (value (assoc/cdr 'value args))
                       (flags (assoc/cdr 'flags args 0))
                       (exp-time (assoc/cdr 'exp-time args 0)))
                      ((case stmt
                         ((set!) set!)
                         ((add!) add!)
                         ((replace!) replace!)
                         ((append!) append!)
                         ((prepend!) prepend!))
                       client key value 
                       #:exp-time exp-time #:flags flags 
                       #:noreply? (assoc/cdr 'noreply? args))))
        ((cas!)
         (let/assert! ((key (assoc/cdr 'key args))
                       (value (assoc/cdr 'value args))
                       (cas (assoc/cdr 'cas args))
                       (flags (assoc/cdr 'flags args 0))
                       (exp-time (assoc/cdr 'exp-time args 0)))
                      (cas! client key value cas
                                      #:exp-time exp-time #:flags flags #:noreply? (assoc/cdr 'noreply? args))))
        ((get gets)
         (cons (list "key" "value" "flags" "cas")
               (apply (case stmt
                        ((get) get)
                        ((gets) gets))
                      client (map cdr (filter (lambda (kv)
                                                (equal? (car kv) 'key))
                                              args)))))
        ((delete!)
         (let/assert! ((key (assoc/cdr 'key args))
                       (delay (assoc/cdr 'delay args 0)))
                      (delete! client key delay (assoc/cdr 'noreply? args))))
        ((incr! decr!)
         (let/assert! ((key (assoc/cdr 'key args))
                       (value (assoc/cdr 'value args)))
                      ((case stmt
                         ((incr!) incr!)
                         ((decr!) decr!))
                       client key value (assoc/cdr 'noreply? args))))
        ((flush-all!) 
         (let/assert! ((delay (assoc/cdr 'key args 10)))
                      (flush-all! client delay (assoc/cdr 'noreply? args))))
        (else
         (error 'query "invalid stmt: ~a" stmt))))))

(define m-query 
  (make-query memcached-set!
              memcached-add!
              memcached-replace!
              memcached-append!
              memcached-prepend!
              memcached-cas!
              memcached-get
              memcached-gets
              memcached-delete!
              memcached-incr!
              memcached-decr!
              memcached-flush-all!))

(define (no-begin handle) (void))

(define (no-commit handle) (void))

(define (no-rollback handle) (void))

(registry-set! drivers 'memcached
               (make-driver m-connect
                            m-disconnect
                            m-query
                            m-prepare
                            no-begin
                            no-commit
                            no-rollback))
The code is now available as bzlib/dbd-memcached via planet under LGPL. Below is a quick tutorial on usage.

Prerequisite

You must have memcached installed for your platform and ensure it is up and running (and make sure the firewall is configured correctly so you can reach the memcached).

Installation

(require (planet bzlib/dbi) (planet bzlib/dbd-memcached)) 
;; single instance 
(define h (connect 'memcached "localhost" 11211)) 
;; dht instance 
(define h (connect 'memcached/dht "localhost" `("192.168.0.1" . 9911) 3372 ...) 
;; disconnect
(disconnect h) 
The dht instance takes in a variable list of either a string (denoting the hostname, with default port of 11211), a number (denoting the port number, default to "localhost"), or a pair of hostname and port number. So the above reads localhost:11211, 192.168.0.1:9911, and localhost:3372.

Storage Queries

;; storage
(query h 'set! `((key . <key>) (value . #"value") (flags . <flags>) (exp-time . <exp-time>) (noreply? . #t|#f))) 
(query h 'add! `((key . <key>) (value . #"value") (flags . <flags>) (exp-time . <exp-time>) (noreply? . #t|#f))) 
(query h 'replace! `((key . <key>) (value . #"value") (flags . <flags>) (exp-time . <exp-time>) (noreply? . #t|#f))) 
(query h 'append! `((key . <key>) (value . #"value") (flags . <flags>) (exp-time . <exp-time>) (noreply? . #t|#f))) 
(query h 'prepend! `((key . <key>) (value . #"value") (flags . <flags>) (exp-time . <exp-time>) (noreply? . #t|#f))) 
(query h 'cas! `((key . <key>) (value . #"value") (cas . <cas>) (flags . <flags>) (exp-time . <exp-time>) (noreply? . #t|#f))) 
key and value are required. Key can be symbol, string, or bytes, as long as there are no whitespaces. Value needs to be bytes. flags (defaults to 0), exp-time (defaults to 0), and noreply? (defaults to false) are all optional. cas is required with the 'cas! query.

Retrieval Queries

(query h 'get `((key . <key>) ...)) 
(query h 'gets `((key . <key>) ...)) 
Both takes in multiple keys, and both return a recordset containing the keys, values, flags, and possibly the cas (false if using 'get, and integers if using 'gets).

Deletion Query

(query h 'delete! `((key . <key>) (delay . <number>) (noreply? . <#t|#f>))
delay defaults to 0 (immediately), and noreply? as always defaults to false.

Increment/Decrement Queries

(query h 'incr! `((key . <key>) (value . <number>) (noreply? . <#t|#f>)))
(query h 'decr! `((key . <key>) (value . <number>) (noreply? . <#t|#f>)))
The value here is a number.

Flush Query

(query h 'flush-all! '((delay . <number>) (noreply? . <#t|#f>)))
The delay here works as a stagger interval for the dht handle, which defaults to 10 seconds.

That's it. Enjoy PLT with memcached.

No comments:

Post a Comment