首先搭建zookeeper集群docker-compose.yml

version: '2'networks:  zk:services:  zk1:    image: zookeeper:3.4    container_name: zk1    networks:        - zk    ports:        - "21811:2181"    environment:      ZOO_MY_ID: 1      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888  zk2:    image: zookeeper:3.4    container_name: zk2    networks:        - zk    ports:        - "21812:2181"    environment:      ZOO_MY_ID: 2      ZOO_SERVERS: server.1=zk1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk3:2888:3888  zk3:    image: zookeeper:3.4    container_name: zk3    networks:        - zk    ports:        - "21813:2181"    environment:      ZOO_MY_ID: 3      ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=0.0.0.0:2888:3888

docker-compose up -d 创建并启动
检查状态

[root@localhost test]# docker exec -it zk1 bash ./bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /conf/zoo.cfgMode: follower[root@localhost test]# docker exec -it zk2 bash ./bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /conf/zoo.cfgMode: follower[root@localhost test]# docker exec -it zk3 bash ./bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /conf/zoo.cfgMode: leader

 

下面我们来程序验证:

我们用现成的curator来操作zk分布式锁

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.GetDataBuilder;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Time;import java.util.Date;import java.util.concurrent.TimeUnit;public class DistributedLock {        public static Logger log = LoggerFactory.getLogger(DistributedLock.class);        private InterProcessMutex interProcessMutex;  //可重入排它锁        private String lockName;  //竞争资源标志        private String root = "/distributed/lock/";//根节点        private static CuratorFramework curatorFramework;        private static String ZK_URL = "127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21813";        static{            curatorFramework= CuratorFrameworkFactory.newClient(ZK_URL,new ExponentialBackoffRetry(1000,3));            curatorFramework.start();        }        /**         * 实例化         * @param lockName         */        public DistributedLock(String lockName){            try {                this.lockName = lockName;                interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);            }catch (Exception e){                log.error("initial InterProcessMutex exception="+e);            }        }        /**         * 获取锁         */        public void acquireLock(){            int flag = 0;            try {                //重试N次,每次最大等待1s                while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)){                    flag++;                    if(flag>5){  //重试次                        break;                    }                }            } catch (Exception e) {                log.error("distributed lock acquire exception="+e);            }            if(flag>5){                log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  busy"+ new Date().getTime());            }else{                log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  success"+ new Date().getTime());            }        }        /**         * 释放锁         */        public void releaseLock(){            try {                if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){                    interProcessMutex.release();                    curatorFramework.delete().inBackground().forPath(root+lockName);                    //byte[] data = curatorFramework.getData().forPath(root + lockName);                    log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  success"+ new Date().getTime());                }            }catch (Exception e){                log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  exception="+e);            }        }    }

 

接下来我们开2个线程来竞争分布式锁:

public class TestLock {    public static void main(String[] args) throws InterruptedException {        String lockName = "lock1";        DistributedLock lockFoo = new DistributedLock(lockName);        //lockFoo.acquireLock();        //lockFoo.releaseLock();        //        System.out.println("主线程ID是:" + Thread.currentThread().getId());        Thread thread1 = new MyThread("thread1",lockFoo);        Thread thread2 = new MyThread("thread2",lockFoo);        thread1.start();        Thread.sleep(1000);        thread2.start();    }}/** * 自定义线程 */class MyThread extends Thread {    /*线程名称*/    private String name;    private DistributedLock lockFoo;    public MyThread(String name,DistributedLock lockFoo) {        this.name = name;        this.lockFoo = lockFoo;    }    @Override    public void run() {        if(this.name.equals("thread1")) {            this.lockFoo.acquireLock();        }        System.out.println("名称" + name + "的线程ID是:" + Thread.currentThread().getId());        try {            Thread.sleep(3000);        } catch (InterruptedException e) {            e.printStackTrace();        }        if(this.name.equals("thread2")) {            this.lockFoo.releaseLock();        }    }}

程序中thread1获取了锁后,thread2解锁失败。

©著作权归作者所有:来自51CTO博客作者wx5eba5c8cb0b6f的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 如何用Redis实现分布式锁以及可用性
  2. 进程与线程 - 入门知识篇
  3. 使用Redis创建分布式锁
  4. mysql的复制
  5. 不惧疫情,中国SDS市场迎来井喷行情!
  6. Spring 获取 request 的几种方法及其线程安全性分析
  7. 争议 | 银行一定要上分布式数据库吗?有没有其他技术路线满足需求?
  8. java中wait和sleep的区别
  9. java并发中的Synchronized关键词

随机推荐

  1. Android Studio 2.0 to Android Studio 3
  2. 停止android monkey
  3. Android 两种HTTP Client的比较
  4. Android: Sensors on Android 2.3 - Ging
  5. Android: Android NDK Overview
  6. Android 内存泄漏场景分析
  7. Android android:persistentDrawingCache
  8. Android 短信发送器
  9. Android7.0中文文档(API)-- ShareActionPro
  10. AndroidStudio使用教程(第一弹)