注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

小葫芦君(汉斯的博客)

博客迁移到新博客:https://blog.ssxingshou.com

 
 
 

日志

 
 
关于我

小小葫芦商城,为您提供高品质的商品,一流的产品,一流的包装服务,一流的物流服务,放心购买

网易考拉推荐

转载:基于ZooKeeper的分布式Session实现三  

2012-03-28 15:10:51|  分类: 默认分类 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

这些方法中都是直接和ZooKeeper上对应的Session进行数据交换。本来我是想在本地Session对象上创建一个ZooKeeper的缓冲,当用户调用Session的读方法时,先到本地缓冲中读数据,读不到再到ZooKeeper上去取,这样可以减少网络的通讯开销。但在分布式环境下,这种策略所带来的数据同步开销更加的可观。因为每次一个子系统的Session数据更新,都将触发所有其他子系统与之关联的Session数据同步操作,否则Session中数据的一致性将无法得到保障。

看到这里,大家可能已经发觉了,所有与ZooKeeper交互的代码都被封装到ZooKeeperHelper类中,接下来就来看看这个类的实现。

4)   ZooKeeperHelper类实现

public class ZooKeeperHelper {

    /** 日志 */

    private static Logger          log        =Logger.getLogger(ZooKeeperHelper.class);

    private static String          hosts;

    private static ExecutorService pool       = Executors.newCachedThreadPool();

    private static final String    GROUP_NAME = "/SESSIONS";

 

    /**

     * 初始化

     */

    public static void initialize(Configuration config) {

        hosts = config.getServers();

    }

 

    /**

     * 销毁

     */

    public static void destroy() {

        if (pool != null) {

            //关闭

            pool.shutdown();

        }

    }

 

    /**

     * 连接服务器

     *

     * @return

     */

    public static ZooKeeper connect() {

        ConnectionWatcher cw = new ConnectionWatcher();

        ZooKeeper zk = cw.connection(hosts);

        return zk;

    }

 

    /**

     * 关闭一个会话

     */

    public static void close(ZooKeeper zk) {

        if (zk != null) {

            try {

                zk.close();

            } catch (InterruptedException e) {

                log.error(e);

            }

        }

    }

 

    /**

     * 验证指定ID的节点是否有效

     * @param id

     * @return

     */

    public static boolean isValid(String id) {

        ZooKeeper zk = connect();

        if (zk != null) {

            try {

                return isValid(id, zk);

            } finally {

                close(zk);

            }

        }

        return false;

    }

 

    /**

     * 验证指定ID的节点是否有效

     * @param id

     * @param zk

     * @return

     */

    public static boolean isValid(String id, ZooKeeper zk) {

        if (zk != null) {

            //获取元数据

            SessionMetaData metadata = getSessionMetaData(id, zk);

            //如果不存在或是无效,则直接返回null

            if (metadata == null) {

                return false;

            }

            return metadata.getValidate();

        }

        return false;

    }

 

    /**

     * 返回指定IDSession元数据

     * @param id

     * @return

     */

    public static SessionMetaData getSessionMetaData(String id, ZooKeeper zk) {

        if (zk != null) {

            String path = GROUP_NAME + "/" + id;

            try {

                // 检查节点是否存在

                Stat stat = zk.exists(path, false);

                //statnull表示无此节点

                if (stat == null) {

                    return null;

                }

                //获取节点上的数据

                byte[] data = zk.getData(path, false, null);

                if (data != null) {

                    //反序列化

                    Object obj = SerializationUtils.deserialize(data);

                    //转换类型

                    if (obj instanceof SessionMetaData) {

                        SessionMetaData metadata = (SessionMetaData) obj;

                        //设置当前版本号

                        metadata.setVersion(stat.getVersion());

                        return metadata;

                    }

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            }

        }

        return null;

    }

 

    /**

     * 更新Session节点的元数据

     * @param id Session ID

     * @param version 更新版本号

     * @param zk

     */

    public static void updateSessionMetaData(String id) {

        ZooKeeper zk = connect();

        try {

            //获取元数据

            SessionMetaData metadata = getSessionMetaData(id, zk);

            if (metadata != null) {

                updateSessionMetaData(metadata, zk);

            }

        } finally {

            close(zk);

        }

    }

 

    /**

     * 更新Session节点的元数据

     * @param id Session ID

     * @param version 更新版本号

     * @param zk

     */

    public static void updateSessionMetaData(SessionMetaData metadata, ZooKeeper zk) {

        try {

            if (metadata != null) {

                String id = metadata.getId();

                Long now = System.currentTimeMillis(); //当前时间

                //检查是否过期

                Long timeout = metadata.getLastAccessTm() + metadata.getMaxIdle(); //空闲时间

                //如果空闲时间小于当前时间,则表示Session超时

                if (timeout < now) {

                    metadata.setValidate(false);

                    log.debug("Session节点已超时[" + id + "]");

                }

                //设置最后一次访问时间

                metadata.setLastAccessTm(now);

                //更新节点数据

                String path = GROUP_NAME + "/" + id;

                byte[] data = SerializationUtils.serialize(metadata);

                zk.setData(path, data, metadata.getVersion());

                log.debug("更新Session节点的元数据完成[" + path + "]");

            }

        } catch (KeeperException e) {

            log.error(e);

        } catch (InterruptedException e) {

            log.error(e);

        }

    }

 

    /**

     * 返回ZooKeeper服务器上的Session节点的所有数据,并装载为Map

     * @param id

     * @return

     */

    public static Map<String, Object> getSessionMap(String id) {

        ZooKeeper zk = connect();

        if (zk != null) {

            String path = GROUP_NAME + "/" + id;

            try {

                //获取元数据

                SessionMetaData metadata = getSessionMetaData(path, zk);

                //如果不存在或是无效,则直接返回null

                if (metadata == null || !metadata.getValidate()) {

                    return null;

                }

                //获取所有子节点

                List<String> nodes = zk.getChildren(path, false);

                //存放数据

                Map<String, Object> sessionMap = new HashMap<String, Object>();

                for (String node : nodes) {

                    String dataPath = path + "/" + node;

                    Stat stat = zk.exists(dataPath, false);

                    //节点存在

                    if (stat != null) {

                        //提取数据

                        byte[] data = zk.getData(dataPath, false, null);

                        if (data != null) {

                            sessionMap.put(node, SerializationUtils.deserialize(data));

                        } else {

                            sessionMap.put(node, null);

                        }

                    }

                }

                return sessionMap;

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

        return null;

    }

 

    /**

     * 创建一个组节点

     */

    public static void createGroupNode() {

        ZooKeeper zk = connect();

        if (zk != null) {

            try {

                // 检查节点是否存在

                Stat stat = zk.exists(GROUP_NAME, false);

                //statnull表示无此节点,需要创建

                if (stat == null) {

                    // 创建组件点

                    String createPath = zk.create(GROUP_NAME, null, Ids.OPEN_ACL_UNSAFE,

                        CreateMode.PERSISTENT);

                    log.debug("创建节点完成:[" + createPath + "]");

                } else {

                    log.debug("组节点已存在,无需创建[" + GROUP_NAME + "]");

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

    }

 

    /**

     * 创建指定Session ID的节点

     * @param sid Session ID

     * @return

     */

    public static String createSessionNode(SessionMetaData metadata) {

        if (metadata == null) {

            return null;

        }

        ZooKeeper zk = connect(); //连接服务期

        if (zk != null) {

            String path = GROUP_NAME + "/" + metadata.getId();

            try {

                // 检查节点是否存在

                Stat stat = zk.exists(path, false);

                //statnull表示无此节点,需要创建

                if (stat == null) {

                    // 创建组件点

                    String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE,

                        CreateMode.PERSISTENT);

                    log.debug("创建Session节点完成:[" + createPath + "]");

                    //写入节点数据

                    zk.setData(path, SerializationUtils.serialize(metadata), -1);

                    return createPath;

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

        return null;

    }

 

  评论这张
 
阅读(855)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017