注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

小葫芦君(汉斯的博客)

博客迁移到新博客:https://blog.ssxingshou.com

 
 
 

日志

 
 
关于我

小小葫芦商城,为您提供高品质的商品,一流的产品,一流的包装服务,一流的物流服务,放心购买

网易考拉推荐

转载:基于ZooKeeper的分布式Session实现四  

2012-03-28 15:11:43|  分类: 默认分类 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
/**

     * 创建指定Session ID的节点(异步操作)

     * @param sid

     * @param waitFor 是否等待执行结果

     * @return

     */

    public static String asynCreateSessionNode(final SessionMetaData metadata, boolean waitFor) {

        Callable<String> task = new Callable<String>() {

            @Override

            public String call() throws Exception {

                return createSessionNode(metadata);

            }

        };

        try {

            Future<String> result = pool.submit(task);

            //如果需要等待执行结果

            if (waitFor) {

                while (true) {

                    if (result.isDone()) {

                        return result.get();

                    }

                }

            }

        } catch (Exception e) {

            log.error(e);

        }

        return null;

    }

 

    /**

     * 删除指定Session ID的节点

     * @param sid Session ID

     * @return

     */

    public static boolean deleteSessionNode(String sid) {

        ZooKeeper zk = connect(); //连接服务期

        if (zk != null) {

            String path = GROUP_NAME + "/" + sid;

            try {

                // 检查节点是否存在

                Stat stat = zk.exists(path, false);

                //如果节点存在则删除之

                if (stat != null) {

                    //先删除子节点

                    List<String> nodes = zk.getChildren(path, false);

                    if (nodes != null) {

                        for (String node : nodes) {

                            zk.delete(path + "/" + node, -1);

                        }

                    }

                    //删除父节点

                    zk.delete(path, -1);

                    log.debug("删除Session节点完成:[" + path + "]");

                    return true;

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

        return false;

    }

 

    /**

     * 删除指定Session ID的节点(异步操作)

     * @param sid

     * @param waitFor 是否等待执行结果

     * @return

     */

    public static boolean asynDeleteSessionNode(final String sid, boolean waitFor) {

        Callable<Boolean> task = new Callable<Boolean>() {

            @Override

            public Boolean call() throws Exception {

                return deleteSessionNode(sid);

            }

        };

        try {

            Future<Boolean> result = pool.submit(task);

            //如果需要等待执行结果

            if (waitFor) {

                while (true) {

                    if (result.isDone()) {

                        return result.get();

                    }

                }

            }

        } catch (Exception e) {

            log.error(e);

        }

        return false;

    }

 

    /**

     * 在指定Session ID的节点下添加数据节点

     * @param sid Session ID

     * @param name 数据节点的名称

     * @param value 数据

     * @return

     */

    public static boolean setSessionData(String sid, String name, Object value) {

        boolean result = false;

        ZooKeeper zk = connect(); //连接服务器

        if (zk != null) {

            String path = GROUP_NAME + "/" + sid;

            try {

                // 检查指定的Session节点是否存在

                Stat stat = zk.exists(path, false);

                //如果节点存在则删除之

                if (stat != null) {

                    //查找数据节点是否存在,不存在就创建一个

                    String dataPath = path + "/" + name;

                    stat = zk.exists(dataPath, false);

                    if (stat == null) {

                        //创建数据节点

                        zk.create(dataPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

                        log.debug("创建数据节点完成[" + dataPath + "]");

                    }

                    //在节点上设置数据,所有数据必须可序列化

                    if (value instanceof Serializable) {

                        int dataNodeVer = -1;

                        if (stat != null) {

                            //记录数据节点的版本

                            dataNodeVer = stat.getVersion();

                        }

                        byte[] data = SerializationUtils.serialize((Serializable) value);

                        stat = zk.setData(dataPath, data, dataNodeVer);

                        log.debug("更新数据节点数据完成[" + dataPath + "][" + value + "]");

                        result = true;

                    }

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

        return result;

    }

 

    /**

     * 删除指定Session ID的节点(异步操作)

     * @param sid

     * @param waitFor 是否等待执行结果

     * @return

     */

    public static boolean asynSetSessionData(final String sid, final String name,

                                             final Object value, boolean waitFor) {

        Callable<Boolean> task = new Callable<Boolean>() {

            @Override

            public Boolean call() throws Exception {

                return setSessionData(sid, name, value);

            }

        };

        try {

            Future<Boolean> result = pool.submit(task);

            //如果需要等待执行结果

            if (waitFor) {

                while (true) {

                    if (result.isDone()) {

                        return result.get();

                    }

                }

            }

        } catch (Exception e) {

            log.error(e);

        }

        return false;

    }

 

    /**

     * 返回指定Session ID的节点下数据

     * @param sid Session ID

     * @param name 数据节点的名称

     * @param value 数据

     * @return

     */

    public static Object getSessionData(String sid, String name) {

        ZooKeeper zk = connect(); //连接服务器

        if (zk != null) {

            String path = GROUP_NAME + "/" + sid;

            try {

                // 检查指定的Session节点是否存在

                Stat stat = zk.exists(path, false);

                if (stat != null) {

                    //查找数据节点是否存在

                    String dataPath = path + "/" + name;

                    stat = zk.exists(dataPath, false);

                    Object obj = null;

                    if (stat != null) {

                        //获取节点数据

                        byte[] data = zk.getData(dataPath, false, null);

                        if (data != null) {

                            //反序列化

                            obj = SerializationUtils.deserialize(data);

                        }

                    }

                    return obj;

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

        return null;

    }

 

    /**

     * 删除指定Session ID的节点下数据

     * @param sid Session ID

     * @param name 数据节点的名称

     * @param value 数据

     * @return

     */

    public static void removeSessionData(String sid, String name) {

        ZooKeeper zk = connect(); //连接服务器

        if (zk != null) {

            String path = GROUP_NAME + "/" + sid;

            try {

                // 检查指定的Session节点是否存在

                Stat stat = zk.exists(path, false);

                if (stat != null) {

                    //查找数据节点是否存在

                    String dataPath = path + "/" + name;

                    stat = zk.exists(dataPath, false);

                    if (stat != null) {

                        //删除节点

                        zk.delete(dataPath, -1);

                    }

                }

            } catch (KeeperException e) {

                log.error(e);

            } catch (InterruptedException e) {

                log.error(e);

            } finally {

                close(zk);

            }

        }

    }

}

从这个类的实现中我们可以发现,与ZooKeeper交互的API非常的友好,基本上就是对文件系统的管理——创建文件、删除文件、检查文件是否存在,更新文件等等。并且对节点的查找就是对文件绝对路径的搜索,效率非常的高。例如,用户调用SessiongetAttribute(String key)方法,则根据当前Session可以拼装成一个搜索节点的路径:/SESSIONS/<Session ID>/<Key>。这样可以快速的定位,并获取该节点的数据。

另外,在这个类中,我还实现类一些操作的异步版本。原来是想为了提高用户响应度,在创建、修改Session节点的时候使用异步调用,但是实际测试下来还是有问题的。所以目前放弃了所有操作的异步版本。

最后让我们来看看连接ZooKeeper服务器的实现类,代码如下所示:

public class ConnectionWatcher implements Watcher {

    private static final int SESSION_TIMEOUT = 5000;

    private CountDownLatch signal = new CountDownLatch(1);

    private Logger log = Logger.getLogger(getClass());

 

    /**

     *

     * @throws IOException

     * @throws InterruptedException

     */

    public ZooKeeper connection(String servers) {

        ZooKeeper zk;

        try {

            zk = new ZooKeeper(servers, SESSION_TIMEOUT, this);

            signal.await();

            return zk;

        } catch (IOException e) {

            log.error(e);

        } catch (InterruptedException e) {

            log.error(e);

        }

        return null;

    }

 

    /*

     * (non-Javadoc)

     *

     * @see

     * org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)

     */

    public void process(WatchedEvent event) {

        KeeperState state = event.getState();

        if (state == KeeperState.SyncConnected) {

            signal.countDown();

        }

    }

}

这个类需要关注的是实现Watcher接口,在上面描述ZooKeeper特性的时候曾经提到过,ZooKeeper通过Watcher机制实现客户端与服务器之间的松耦合交互,在process方法中,通过对各种事件的监听,可以进行异步的回调处理。

这里的SESSION_TIMEOUT并不是Web容器中Session的超时。这是ZooKeeper对一个客户端的连接,即一个连接会话的超时设置。该值一般设置在25秒之间。

  评论这张
 
阅读(703)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017