分类: 2011 March

野鸡版delicious推荐系统

日子有点无聊,照着<Programming collective Intelligence>写了一个delicious推荐的小模型(代码),就当作是再学python的hello, world! :p 以后无聊了就跑一下代码,看看有什么值得看的, HoHo~

运行结果如下:

>>> from delicious import *
>>> table=initializeUserDict('programming')
>>> table['myself']={}
>>> fillItems(table) 考虑到国内的网络,冲杯咖啡先吧...
>>> from recommendations import *
# 对用户username推荐n条网页链接
>>> getRecommendations(table,"username")[0:n]
# 输出结果前者为推荐值,后者为链接
[(0.61988282180389409, 'http://html5boilerplate.com/mobile/')]
...

deliciousapi

deliciousapi是非官方的api,用于从delicious上拿数据,它的接口很简洁,作者博客上有相应的demo,这里不废话.

initializeUserDict and fillItems

initializeUserDict(tag, count)首先通过deliciousapi.DeliciousAPI().get_urls(‘tag’)获得某个tag下的count链接,然后通过.get_url(‘url’)获取该链接相关的信息,e.g. bookmarks,tags.最后收集bookmarks对应的用户到字典user_dict,将其返回.代码如下:

def initializeUserDict(tag, count=1):
   user_dict={}
   dapi = deliciousapi.DeliciousAPI()

   for url in dapi.get_urls(tag=tag)[0:count]:
      for item in dapi.get_url(url).bookmarks:
         user_dict[item[0]]={}
   return user_dict

fillIteams(user_dict)遍历字典user_dict,通过.get_user(user)获取user收集的bookmarks.如果某个url被所有user收藏,那么将user_dict[user][url]置位,否则清零.代码如下:

def fillItems(user_dict):
   all_items={}
   dapi = deliciousapi.DeliciousAPI()

   for user in user_dict:
      for i in range(3):
         try:
            posts=dapi.get_user(user)
            break
         except:
            print "Fail user "+user+", retrying"
            time.sleep(4)
      for item in posts.bookmarks:
      	 url=item[0]
      	 user_dict[user][url]=1.0
      	 all_items[url]=1

   for ratings in user_dict.values():
      for item in all_items:
         if item not in ratings:
         	ratings[item]=0.0

经过initializeUserDict()和fillItems()两步后,得到的user_dict形如:

{'username': {'http://url': 0.0}, {'http://lru': 1.0},
 'nameuser': {'http://url': 0.0}, {'http://lru': 1.0},
  ...
}

getRecommendations

现在看看最核心的推荐算法,其实很简单:

def sim_distance(prefs, person1, person2):
   si={}
   for item in prefs[person1]:
      if item in prefs[person2]:
           si[item]=1

   if len(si)==0: return 0


   sum_of_squares=sum([pow(prefs[person1][item]-prefs[person2][item], 2)
                     for item in prefs[person1] if item in prefs[person2]])

   return 1/(1+sqrt(sum_of_squares))

# gets recommendations for a person by using a weighted average
# of every other user's rankings
def getRecommendations(prefs, person, similarity=sim_distance):
   totals={}
   simSums={}

   for other in prefs:
      if other==person: continue
      sim=similarity(prefs,person,other)

      if sim<=0: continue

      for item in prefs[other]:
         # only score those haven't seen yet
         if item not in prefs[person] or prefs[person][item]==0:
            #similarity * score
            totals.setdefault(item,0)
            totals[item]+=prefs[other][item]*sim
            # sum of similarities
            simSums.setdefault(item,0)
            simSums[item]+=sim

   #create the normolized list
   rankings=[(total/simSums[item],item) for item,total in totals.items()]

   # return the sorted list
   rankings.sort()
   rankings.reverse()
   return rankings

参数person表示被推荐人,而参数similarity表示计算Correlation and dependence的函数,默认值为sim_distance,其实也就是个Euclidean Distance.

Conclusion and next steps

小模型很简单,有空时可以完善一下算法.( 如果让C程序员看见如此大的稀疏矩阵,那得有多纠结 :( )

小计划

最近太挫了,不知道为什么总是很烦躁,没有多少心思读书,感觉日子过得很快,一眨眼时间就没了.明明知道是焦虑的缘故,可仍旧魂不守舍,静不下来.

毕业之前的这段时间我都打算在学校里度过了.为了抓住时间,定个小计划吧:

  • 学习Linux内核,主要是网络协议栈.
  • 读完lighttpd代码,之后再读点其他代码.
  • 写一个httpd.
  • 彻底入门python.
  • boost? design pattern?

为了SMART一点,我要多写博客,多努力.

最后,多放松,只剩下最后这点自由的日子了,唉~.

libevent源码浅析: http库

libevent自带了一个http库,用它可以很简单的实现一个http服务器,本文非常简单地分析之.

evhttp

evhttp库有几个主要的结构体,它们之间的联系非常龌龊:

其中,结构体event, min_heap, evsignal_info, eventop, event_base在前面几篇文章中已经介绍过了,这里不再啰嗦.

evbuffer

evbuffer用于读或写缓冲,图示为:

和evbuffer有关的外露接口主要是:

//从文件读数据到缓冲,读取量为max(howmuch, 4096)
int evbuffer_read(struct evbuffer *buf, int fd, int howmuch);

//把缓冲写出文件
int evbuffer_write(struct evbuffer *buffer, int fd)

evbuffer比较简单,不多介绍.

evhttp, evhttp_connection, evhttp_request

libevent对成员的命名不太在意,其实evhttp可以看做是echttpsever,它绑定到某个特定端口和地址(socket(), bind()),保存访问该server的连接(通过成员connections,).evhttp_connection是保存连接信息的结构体, evhttp_request表示请求.

看看http库的使用流程:

void http_handler(struct evhttp_request *req, void *arg)
{
    struct evbuffer *buf;
    buf = evbuffer_new();
    
    // 分析请求
    char *decode_uri = strdup((char*) evhttp_request_uri(req));
    struct evkeyvalq http_query;
    evhttp_parse_query(decode_uri, &http_query);
    free(decode_uri);
        
    // 从http头中获取参数
    const char *request_value = evhttp_find_header(&http_query, "data");
        
    // 返回HTTP头部
    evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=UTF-8");
    evhttp_add_header(req->output_headers, "Server", "my_httpd");
    //evhttp_add_header(req->output_headers, "Connection", "keep-alive");

    evhttp_add_header(req->output_headers, "Connection", "close");
    
    // 将要输出的值写入输出缓存
    if(request_value != NULL) {
        evbuffer_add_printf(buf, "%s", request_value);
    } else {
        evbuffer_add_printf(buf, "%s", "no error.");
    }
    
    // 输出
    evhttp_send_reply(req, HTTP_OK, "OK", buf);
        
    // 内存释放
    evhttp_clear_headers(&http_query);
    evbuffer_free(buf);
}

int main(int argc, char **argv)
{
    char *host_ip = "0.0.0.0";
    int host_port = 8080;
    int timeout = 3;

    struct evhttp *httpd;

    event_init();

    //根据host_ip和host_port创建一个addrinfo结构体,然后创建一个socket,绑定到这个socket后,
    //根据这些信息得到得到一个event(回调函数设置为accept_socket),然后将这个event关联到对应的event_base,
    //之后插入到&http->sockets队列中,然后返回&http	
    httpd = evhttp_start(host_ip, host_port);

    if (httpd == NULL) {
        fprintf(stderr, "Error: Unable to listen on %s:%d\n\n", host_ip, host_port);        
        exit(1);        
    }

    // 设置请求超时时间
    evhttp_set_timeout(httpd, timeout);

    // 设置请求的处理函数
    evhttp_set_gencb(httpd, http_handler, NULL);

    event_dispatch();

    evhttp_free(httpd);

    return 0;
}

[1] 首先看看evhttp_start():

//创建一个evhttp,绑定到端口和地址
struct evhttp * evhttp_start(const char *address, u_short port)
{
	struct evhttp *http = evhttp_new_object();
	evhttp_bind_socket(http, address, port);
	return (http);
}

函数evhttp_bind_socket()代码如下:

//根据address和port创建一个非阻塞的socket,
//将其bind后的fd创建一个event(在这里设置好回调函数)后添加到&amp;amp;amp;http-&amp;amp;gt;sockets
int evhttp_bind_socket(struct evhttp *http, const char *address, u_short port)
{
	int fd;
	int res;

	//绑定一个socket
	fd = bind_socket(address, port, 1 /*reuse*/);

	//根据fd创建一个event,设置好回调函数,
        //然后将这个event关联到对应的event_base,并将它插入到&amp;amp;amp;http-&amp;amp;gt;sockets中.
	res = evhttp_accept_socket(http, fd);

	return (res);
}

在这里,函数bing_socket()的作用是根据地址和端口创建一个socket,返回bind()后的文件描述符.函数evhttp_accept_socket()的作用在注释中也说明了,其代码如下:

int evhttp_accept_socket(struct evhttp *http, int fd)
{
	struct evhttp_bound_socket *bound;
	struct event *ev;
	int res;

	bound = malloc(sizeof(struct evhttp_bound_socket));
	ev = &bound->bind_ev;

	/* Schedule the socket for accepting */
	//设置这个ev,回调函数为accept_socket,针对的文件描述符为fd
	event_set(ev, fd, EV_READ | EV_PERSIST, accept_socket, http);

	//将ev关联到&http->base
	EVHTTP_BASE_SET(http, ev);

	//将ev添加进&http->base
	res = event_add(ev, NULL);

	//将bound插入到&http->sockets
	TAILQ_INSERT_TAIL(&http->sockets, bound, next);
}

需要指出的是,在这个函数中,struct event *ev可以看成是服务器struct evhttp的代理,evhttp通过这个ev是否可读来注意到是否有新的连接.(后文会分析.)

[2] 函数evhttp_set_timeout()和evhttp_set_gencb()逻辑比较简单,分别设置超时时间和回调函数.

[3]重头戏来了,函数event_dispatch()负责分发,在前面的文章已经介绍过了,它最终会调用event_base_loop(),分别查看定时器最小堆,信号队列和I/O队列.在http库中,当有一个新的连接时,[1]中已加入到event_base已注册事件队列的事件ev->fd将变成可读,它被移入已就绪事件队列,然后由函数event_process_active()调用ev的回调函数accept_socket()(回调函数在evhttp_accept_socket()函数中设置).

需要说明的是,以下的内容都是在event_base_loop()死循环中被处理的.

现在看一下回调函数accept_socket()的代码:

//作为回调函数,accept 一个 socket
static void accept_socket(int fd, short what, void *arg)
{
	struct evhttp *http = arg;
	struct sockaddr_storage ss;
	socklen_t addrlen = sizeof(ss);
	int nfd;

        //获得accept()后的文件描述符
        nfd = accept(fd, (struct sockaddr *)&ss, &addrlen);
	
        //设置为非阻塞
	evutil_make_socket_nonblocking(nfd);

        //获得连接
	evhttp_get_request(http, nfd, (struct sockaddr *)&ss, addrlen);
}

代码很好懂,看看evhttp_get_request()函数:

//在回调函数accept_socket中被调用.
//这里传入的参数fd是accept()后返回的描述符
void evhttp_get_request(struct evhttp *http, int fd, struct sockaddr *sa, socklen_t salen)
{
	struct evhttp_connection *evcon;

	//根据fd和sa创建一个evhttp_connection,并将它关联到http->base.
	evcon = evhttp_get_request_connection(http, fd, sa, salen);

	if (http->timeout != -1)
		//watch out!!!在这里evcon会被设置超时时间.
		evhttp_connection_set_timeout(evcon, http->timeout);

	 //将evcon关联到http
	evcon->http_server = http;

	//将evcon插入到&http->connections
	TAILQ_INSERT_TAIL(&http->connections, evcon, next);

	evhttp_associate_new_request_with_connection(evcon);
}

跟踪下去看看evhttp_associate_new_request_with_connection()函数:

//初始化一个绑定到evcon的evhttp_request
static int evhttp_associate_new_request_with_connection(struct evhttp_connection *evcon)
{
	struct evhttp *http = evcon->http_server;
	struct evhttp_request *req;
        
        //在这里会设置该req的回调函数evhttp_handle_request(),此函数很重要..
	req = evhttp_request_new(evhttp_handle_request, http);

	req->evcon = evcon;
	req->flags |= EVHTTP_REQ_OWN_CONNECTION;
	
	TAILQ_INSERT_TAIL(&evcon->requests, req, next);
	
	req->kind = EVHTTP_REQUEST;
	
	req->remote_host = strdup(evcon->address);

	req->remote_port = evcon->port;

	evhttp_start_read(evcon);
	
	return (0);
}

经过这么多层次的函数调用,终于要读数据了,evhttp_start_read()代码:

void evhttp_start_read(struct evhttp_connection *evcon)
{
	/* Set up an event to read the headers */
	if (event_initialized(&evcon->ev))
		event_del(&evcon->ev);

	//根据这些参数设置好evcon->ev.回调函数为evhttp_read()
	event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
	
        //关联到event_base中
	EVHTTP_BASE_SET(evcon, &evcon->ev);

	//将该ev插入到event_base中

	//watch out!!!!
	//在这里会设置这个event的超时时间,它将被加入到定时器最小堆中
	//超时之后,该事件会被event_active(),插入到就绪队列中,然后执行其回调函数.
	
	//evcon->timeout是在evhttp_get_request()被设置的
	evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
	evcon->state = EVCON_READING_FIRSTLINE;
}

可以看到,对于这个连接,evhttp_connection结构体evcon是通过内部成员event *ev来处理的.函数evhttp_start_read()对&evcon->ev设置好超时时间和回调函数后将它插入到event_base中.

直到这里,回调函数accept_socket()的功能终于完成了.

(3.2) 上一段提到accept_socket()函数最终会调用evhttp_start_read()来设置连接对应的event(&evcon->ev)的超时时间和回调函数,并将它插入已激活事件队列进行schedule.

在&evcon->ev超时之后,它会被函数timeout_process()从已激活事件队列移入已就绪事件队列,然后由函数event_process_active()调用它的回调函数,也即是evhttp_read()(此回调函数在函数evhttp_start_read()中设置).代码如下:

//读数据
void evhttp_read(int fd, short what, void *arg)
{
	struct evhttp_connection *evcon = arg;
	//拿到第一个req
	struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
	struct evbuffer *buf = evcon->input_buffer;
	int n, len;

	if (what == EV_TIMEOUT) {
		evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
		return;
	}

	//从fd读数据到buf
	n = evbuffer_read(buf, fd, -1);
	len = EVBUFFER_LENGTH(buf);
	
	if (n == -1) {
		if (errno != EINTR && errno != EAGAIN) {
			event_debug(("%s: evbuffer_read", __func__));
			evhttp_connection_fail(evcon, EVCON_HTTP_EOF);
		} else {
			evhttp_add_event(&evcon->ev, evcon->timeout,
			    HTTP_READ_TIMEOUT);	       
		}
		return;
	} else if (n == 0) {
		/* Connection closed */
		evhttp_connection_done(evcon);
		return;
	}

	switch (evcon->state) {
	case EVCON_READING_FIRSTLINE:
		evhttp_read_firstline(evcon, req);
		break;
	case EVCON_READING_HEADERS:
		evhttp_read_header(evcon, req);
		break;
	case EVCON_READING_BODY:
		evhttp_read_body(evcon, req);
		break;
	case EVCON_READING_TRAILER:
		evhttp_read_trailer(evcon, req);
		break;
	case EVCON_DISCONNECTED:
	case EVCON_CONNECTING:
	case EVCON_IDLE:
	case EVCON_WRITING:
	default:
		event_errx(1, "%s: illegal connection state %d",
			   __func__, evcon->state);
	}
}

代码中的fd其实是evcon->fd,也就是accept()后返回的文件描述符..

函数evhttp_read()就这么一直读数据下去(可能经过了多次循环,因为在evhttp_accept_socket()函数中被设置了EV_PERSIST标志,所以它不会从已注册时间队列中被移除,而是不断的超时,不断地被调用其回调函数),直到数据读完了(这里经过了好多状态,非常让人不爽的是,libevent官网上连个FSM图都没有,这种体力活我也不会干的,哈哈~),就调用evhttp_connection_done(),代码如下:

//累个半死终于读完啦
static void evhttp_connection_done(struct evhttp_connection *evcon)
{
	...//省略

        //调用req的回调函数
	(*req->cb)(req, req->cb_arg);

}

在这里,会调用req的回调函数,也就是在函数evhttp_associate_new_request_with_connection()中设置的evhttp_handle_request(),此回调函数代码为:

//处理请求,在这里会调用http的回调函数http->gencb
static void evhttp_handle_request(struct evhttp_request *req, void *arg)
{
        ...//一堆无用的噪音
	
        //由用户指定的回调函数终于显灵了.
	if (http->gencb) {
		(*http->gencb)(req, http->gencbarg);
		return;
	}

}

在数据全都读入后,libevent终于终于终于调用了用户指定的回调函数(*http->gencb).在本文一开始的小例子中,也就是函数http_handler(),要达到这一步可真不容易啊,撒花..

由上文提到的种种的繁琐的过程可以看出,libevent对于user来说是很友善的,几句代码就可以实现一个httpd,可以对于developer来说就太恶心了..