注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

小葫芦君(汉斯的博客)

博客迁移到新博客:https://blog.ssxingshou.com

 
 
 

日志

 
 
关于我

小小葫芦商城,为您提供高品质的商品,一流的产品,一流的包装服务,一流的物流服务,放心购买

网易考拉推荐

转载:基于ZooKeeper的分布式Session实现二  

2012-03-28 15:09:34|  分类: 默认分类 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

5.   算法实现

从上述的挑战来看,要写一个分布式应用程序是困难的,主要原因是因为局部故障。由于数据需要通过网络传输,而网络是不稳定的,所以如果网络发生故障,则所有的数据通讯都将终止。ZooKeeper并不能解决网络故障的发生,甚至它本身也是基于网络的分布式应用程序。但是它为我们提供了一套工具集合,帮助我们建立安全处理局部故障的分布式应用程序。接下来我们就开始描述如何实现基于ZooKeeper的分布式Session系统。

1)   基于ZooKeeper的分布式Session系统架构

 

 

为了实现高可用性,采用了ZooKeeper集群,ZooKeeper集 群是由一台领导者服务器和若干台跟随者服务器构成(总服务器数要奇数)。所有的读操作由跟随者提供,而写操作由领导者提供,并且领导者还负责将写入的数据 复制到集群中其他的跟随者。当领导者服务器由于故障无法访问时,剩下的所有跟随者服务器就开始进行领导者的选举。通过选举算法,最终由一台原本是跟随者的 服务器升级为领导者。当然原来的领导者服务器一旦被恢复,它就只能作为跟随者服务器,并在下一次选举中争夺领导者的位置。

Web容器中的Session容器也将发生变化。它不再对用户的Session进行本地管理,而是委托给ZooKeeper和我们自己实现的Session管理器。也就是说,ZooKeeper负责Session数据的存储,而我们自己实现的Session管理器将负责Session生命周期的管理。

最后是关于在分布式环境下共享Session ID的策略。我们还是通过客户端的Cookie来实现,我们会自定义一个Cookie,并通过一定的算法在多个子系统之间进行共享。下面会对此进行详细的描述。

2)   分布式Session的数据模型

Session数据的存储是有一定格式的,下图展示了一个Session ID”1gyh0za3qmld7”SessionZooKeeper上的存储结构:

 

“/SESSIONS”是一个组节点,用来在ZooKeeper上划分不同功能组的定义。你可以把它理解为一个文件夹目录。在这个目录下可以存放0个或N个子节点,我们就把一个Session的实例作为一个节点,节点的名称就是Session ID。在ZooKeeper中,每个节点本身也可以存放一个字节数组。因此,每个节点天然就是一个Key-Value键值对的数据结构。

我们将Session中的用户数据(本质上就是一个Map)设计成多节点,节点名称就是Sessionkey,而节点的数据就是SessionValue。采用这种设计主要是考虑到性能问题和ZooKeeper对节点大小的限制问题。当然,我们可以将Session中的用户数据保存在一个Map中,然后将Map序列化之后存储在对应的Session节点中。但是大部分情况下,我们在读取数据时并不需要整个Map,而是Map中的一个或几个值。这样就可以避免一个非常大的Map在网络间传来传去。同理,在写Session的时候,也可以最大限度的减少数据流量。另外由于ZooKeeper是一个小文件系统,为了性能,每个节点的大小为1MB。如果Session中的Map大于1MB,就不能单节点的存储了。当然,一个Key的数据量是很少会超过1MB的,如果真的超过1MB,你就应该考虑一下,是否应该将此数据保存在Session中。

最后我们来关注一下Session节点中的数据——SessionMetaData。它是一个Session实例的元数据,保存了一些与Session生命周期控制有关的数据。以下代码就是SessionMetaData的实现:

public class SessionMetaData implements Serializable {

    private static final long serialVersionUID = -6446174402446690125L;

    private String            id;

    /**session的创建时间*/

    private Long              createTm;

    /**session的最大空闲时间*/

    private Long              maxIdle;

    /**session的最后一次访问时间*/

    private Long              lastAccessTm;

    /**是否可用*/

    private Boolean           validate         = false;

    /**当前版本*/

    private int               version          = 0;

 

    /**

     * 构造方法

     */

    public SessionMetaData() {

        this.createTm = System.currentTimeMillis();

        this.lastAccessTm = this.createTm;

        this.validate = true;

}

 

……以下是Ngettersetter方法

 

其中需要关注的属性有:

a)     id属性:Session实例的ID

b)     maxIdle属性:Session的最大空闲时间,默认情况下是30分钟。

c)     lastAccessTm属性:Session的最后一次访问时间,每次调用Request.getSession方法时都会去更新这个值。用来计算当前Session是否超时。如果lastAccessTm+maxIdle小于System. currentTimeMillis(),就表示当前Session超时。

d)     validate属性:表示当前Session是否可用,如果超时,则此属性为false

e)     version属性:这个属性是为了冗余Znodeversion值,用来实现乐观锁,对Session节点的元数据进行更新操作。

这里有必要提一下一个老生常谈的问题,就是所有存储在节点上的对象必须是可序列化的,也就是必须实现Serializable接口,否则无法保存。这个问题在memcachedZooKeeper上都存在的。

3)   实现过程

实现分布式Session的第一步就是要定义一个filter,用来拦截HttpServletRequest对象。以下代码片段,展现了在Jetty容器下的filter实现。

public class JettyDistributedSessionFilter extends DistributedSessionFilter {

    private Logger log = Logger.getLogger(getClass());

 

    @Override

    public void init(FilterConfig filterConfig) throws ServletException {

        super.init(filterConfig);

        // 实例化Jetty容器下的Session管理器

        sessionManager = new JettyDistributedSessionManager(conf);

        try {

            sessionManager.start(); // 启动初始化

            //创建组节点

            ZooKeeperHelper.createGroupNode();

            log.debug("DistributedSessionFilter.init completed.");

        } catch (Exception e) {

            log.error(e);

        }

    }

 

    @Override

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)

                                                                                             throws IOException,

                                                                                             ServletException {

        // Jetty容器的Request对象包装器,用于重写Session的相关操作

        JettyRequestWrapper req = new JettyRequestWrapper(request, sessionManager);

        chain.doFilter(req, response);

    }

}

这个filter是继承自DistributedSessionFilter的,这个父类主要是负责完成初始化参数设置等通用方法的实现,代码如下所示:

public abstract class DistributedSessionFilter implements Filter {

    protected Logger           log      = Logger.getLogger(getClass());

    /**参数配置*/

    protected Configuration    conf;

    /**Session管理器*/

    protected SessionManager   sessionManager;

    /**初始化参数名称*/

    public static final String SERVERS  = "servers";

    public static final String TIMEOUT  = "timeout";

    public static final String POOLSIZE = "poolsize";

 

    /**

     * 初始化

     * @see javax.servlet.Filter#init(javax.servlet.FilterConfig)

     */

    @Override

    public void init(FilterConfig filterConfig) throws ServletException {

        conf = new Configuration();

        String servers = filterConfig.getInitParameter(SERVERS);

        if (StringUtils.isNotBlank(servers)) {

            conf.setServers(servers);

        }

        String timeout = filterConfig.getInitParameter(TIMEOUT);

        if (StringUtils.isNotBlank(timeout)) {

            try {

                conf.setTimeout(Long.valueOf(timeout));

            } catch (NumberFormatException ex) {

                log.error("timeout parse error[" + timeout + "].");

            }

        }

        String poolsize = filterConfig.getInitParameter(POOLSIZE);

        if (StringUtils.isNotBlank(poolsize)) {

            try {

                conf.setPoolSize(Integer.valueOf(poolsize));

            } catch (NumberFormatException ex) {

                log.error("poolsize parse error[" + poolsize + "].");

            }

        }

        //初始化ZooKeeper配置参数

        ZooKeeperHelper.initialize(conf);

    }

 

    /**

     * 销毁

     * @see javax.servlet.Filter#destroy()

     */

    @Override

    public void destroy() {

        if (sessionManager != null) {

            try {

                sessionManager.stop();

            } catch (Exception e) {

                log.error(e);

            }

        }

        //销毁ZooKeeper

        ZooKeeperHelper.destroy();

        log.debug("DistributedSessionFilter.destroy completed.");

    }

filter中需要关注的重点是doFilter方法。

    @Override

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)

                                                                                             throws IOException,

                                                                                             ServletException {

        // Jetty容器的Request对象包装器,用于重写Session的相关操作

        JettyRequestWrapper req = new JettyRequestWrapper(request, sessionManager);

        chain.doFilter(req, response);

    }

}

这里实例化了一个包装器(装饰者模式)类,用来包装Jetty容器的Request对象,并覆盖其getSession方法。 另外我们还自己实现sessionManager接口,用来管理Session的生命周期。通过filter机制,我们就接管了Session的整个生命周期的管理权。

接下来我们来看看,Request包装器是如何重写getSession方法,替换成使用ZooKeeper上的Session数据。关键代码如下所示:

@Override

    public HttpSession getSession(boolean create) {

        //检查Session管理器

        if (sessionManager == null && create) {

            throw new IllegalStateException("No SessionHandler or SessionManager");

        }

        if (session != null && sessionManager != null) {

            return session;

        }

 

        session = null;

 

        //从客户端cookie中查找Session ID

        String id = sessionManager.getRequestSessionId(request);

        log.debug("获取客户端的Session ID:[" + id + "]");

        if (id != null && sessionManager != null) {

            //如果存在,则先从管理器中取

            session = sessionManager.getHttpSession(id, request);

            if (session == null && !create) {

                return null;

            }

        }

        //否则实例化一个新的Session对象

        if (session == null && sessionManager != null && create) {

            session = sessionManager.newHttpSession(request);

        }

        return session;

    }

 

其实实现很简单,大部分工作都委托给了sessionManager来处理。因此,还是让我们来关注sessionManager的相关方法实现。

A)   获取Session ID:

@Override

    public String getRequestSessionId(HttpServletRequest request) {

        return CookieHelper.findSessionId(request);

    }

这个方法就是从客户端的Cookies中查找我们的一个自定义的Cookie值,这个Cookie的名称为:DISTRIBUTED_SESSION_ID”(Web容器自己也在Cookie中写了一个值,用来在不同的request中传递Session ID,这个Cookie的名称叫“JSESSIONID)。如果返回null,则表示客户端从来都没有创建过Session实例。

B)   如果返回的Cookie值不为null,则有3种可能性:其一,已经实例化过一个Session对象并且可以正常使用;其二,虽然已经实例化过了,但是可能此Session已经超时失效;其三,分布式环境中的其他子系统已经实例化过了,但是本系统中还未实例化过此Session对象。所以先要对已经存在的Session ID进行处理。关键代码如下:

@Override

    public HttpSession getHttpSession(String id, HttpServletRequest request) {

        //类型检查

        if (!(request instanceof Request)) {

            log.warn("不是Jetty容器下的Request对象");

            return null;

        }

        //HttpServletRequest转换成Jetty容器的Request类型

        Request req = (Request) request;

        //ZooKeeper服务器上查找指定节点是否有效

        boolean valid = ZooKeeperHelper.isValid(id);

        //如果为false,表示服务器上无该Session节点,需要重新创建(返回null)

        if (!valid) {

            //删除本地的副本

            sessions.remove(id);

            return null;

        } else {

            //更新Session节点的元数据

            ZooKeeperHelper.updateSessionMetaData(id);

            HttpSession session = sessions.get(id);

            //如果存在,则直接返回

            if (session != null) {

                return session;

            }

            //否则创建指定IDSession并返回(用于同步分布式环境中的其他机器上的Session本地副本)

            session = new JettyDistributedSession((AbstractSessionManager) req.getSessionManager(),

                System.currentTimeMillis(), id);

            sessions.put(id, session);

            return session;

        }

    }

首先根据IDZooKeeper上验证此Session是否有效,如果无效了,则直接返回null,表示此Session已经超时不可用,同时需要删除本地的“影子”Session对象(不管存在与否)。如果该节点有效,则首先更新该Session节点的元数据(例如,最后一次访问时间)。然后先到本地的Session容器中查找是否存在该IDSession对象。本地Session容器中的Session对象并不用来保存用户数据,也不进行生命周期管理,纯粹为了在不同请求中进行传递。唯一有价值的就Session ID,因此,我喜欢把本地Session容器中的Session对象称为“影子”Session,它只是ZooKeeper上真正Session的一个影子而已。

如果Session节点没有失效,但是本地Session容器并没有指定ID影子”Session,则表示是第三种可能性,需要进行影子Session的同步。正如代码中所展示的,我们实例化一个指定IDSession对象,并放入当前系统的Session容器中,这样就完成了Session ID在分布式环境中的共享,以及Session对象在各子系统之间的同步。

C)   如果通过上面的方法返回的Session对象还是null,则真的需要实例化一个Session对象了,代码如下所示:

    public HttpSession newHttpSession(HttpServletRequest request) {

        //类型检查

        if (!(request instanceof Request)) {

            log.warn("不是Jetty容器下的Request对象");

            return null;

        }

        //HttpServletRequest转换成Jetty容器的Request类型

        Request req = (Request) request;

        Session session = new JettyDistributedSession(

            (AbstractSessionManager) req.getSessionManager(), request);

        addHttpSession(session, request);

        String id = session.getId();

        // cookie

        Cookie cookie = CookieHelper.writeSessionIdToCookie(id, req, req.getConnection()

            .getResponse());

        if (cookie != null) {

            log.debug("Wrote sid to Cookie,name:[" + cookie.getName() + "],value:["

                      + cookie.getValue() + "]");

        }

        //ZooKeeper服务器上创建session节点,节点名称为Session ID

        //创建元数据

        SessionMetaData metadata = new SessionMetaData();

        metadata.setId(id);

        metadata.setMaxIdle(config.getTimeout() * 60 * 1000); //转换成毫秒

        ZooKeeperHelper.createSessionNode(metadata);

        return session;

    }

以上代码会实例化一个Session对象,并将Session ID写入客户端Cookie中,最后实例化Session元数据,并在ZooKeeper上新建一个Session节点。

通过上面步骤,我们就将Session的整个生命周期管理与ZooKeeper关联起来了。接下来我们看看Session对象的几个重要方法的重写:

public synchronized Object getAttribute(String name) {

        //获取session ID

        String id = getId();

        if (StringUtils.isNotBlank(id)) {

            //返回Session节点下的数据

            return ZooKeeperHelper.getSessionData(id, name);

        }

        return null;

    }

 

public synchronized void removeAttribute(String name) {

        //获取session ID

        String id = getId();

        if (StringUtils.isNotBlank(id)) {

            //删除Session节点下的数据

            ZooKeeperHelper.removeSessionData(id, name);

        }

    }

 

public synchronized void setAttribute(String name, Object value) {

        //获取session ID

        String id = getId();

        if (StringUtils.isNotBlank(id)) {

            //将数据添加到ZooKeeper服务器上

            ZooKeeperHelper.setSessionData(id, name, value);

        }

    }

 

public void invalidate() throws IllegalStateException {

        //获取session ID

        String id = getId();

        if (StringUtils.isNotBlank(id)) {

            //删除Session节点

            ZooKeeperHelper.deleteSessionNode(id);

        }

    }
  评论这张
 
阅读(1770)| 评论(1)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017