spring-data-redis之秒杀

在现代电商中,秒杀是常用的促销手段,秒杀带来促销的同时,无疑给IT系统带来巨大冲击,今天我们就来模拟下秒杀。

超卖原因

下面我们说说,抢购流程!

购买流程图

在上面的图中,如果购买的人只有一个是没有问题的,但是如果是两个就有问题,可能会出现“超卖现象”。

  1. 有A,B两个线程操作数据库。
  2. 假设此时库存为1,即只允许一人买。
  3. A先查到数据库获取库存,A进行购买,支付成功后库存进行减一操作,但此时由于CPU切换,A被挂起。
  4. B访问数据库,此时A还未更新,B同样进行了购买,B成功进行了减一操作,数据库数量变为0,这个时候A被唤醒,也进行了减一操作,这就导致了库存变为-1,导致超卖。

如何控制?

方式一

加锁!最简单的方式,强制线程同步。同时缺点也很明显,那就是某一个线程操作的时候,其他线程必须等待它执行结束才能获取执行权。synchronized是典型的悲观锁,等待时间非常长,响应慢 。

方式二

使用队列,所有的请求全部进入队列,取其中前n个处理,其他的丢弃。但是并发量高的话,并发量会让队列内存瞬间升高,影响服务器性能。

方式三

使用乐观锁,乐观锁的一种实现是CAS(Compare And Swap),其思想是假设数据一般情况下没有冲突,在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做(在当前的情景下,直接返回秒杀失败即可)。

乐观锁实践

乐观锁,可以用redis的watch实现,也可以用MySQL实现,高并发情况下还是选择redis比较靠谱。其MySQL实现,请参看我的另一篇博客《乐观锁在MySQL中的应用》。

redis为什么能实现乐观锁?

在Redis的事务中,WATCH命令可用于提供CAS功能。假设我们通过WATCH命令在事务执行之前监控了多个Keys,倘若在WATCH之后有任何Key的值发生了变化,EXEC命令执行的事务都将被放弃,同时返回Null multi-bulk应答以通知调用者事务执行失败。

具体实践

当商户在后台添加商品的时候,输入商品数,此时将商品的id作为key(key有规范的哦!),数量作为值,存入redis。

Talk is cheap, show me the code!

spring-redis.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 连接池配置 最大连接数 最大空闲数 最长等待时间 连接是否可用-->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<!--最大连接数-->
<property name="maxTotal" value="${redis.maxActive}"/>
<!--最大空闲数-->
<property name="maxIdle" value="${redis.maxIdle}"/>
<!--最大连接数后最长阻塞时间-->
<property name="maxWaitMillis" value="${redis.maxWaitMillis}"/>
<!--当连接给调用者使用时,是否检测其有效性-->
<property name="testOnBorrow" value="${redis.testOnBorrow}"/>
<!--归还连接时,是否检测其有效性-->
<property name="testOnReturn" value="${redis.testOnReturn}"/>
<!--达到最大连接数是否阻塞-->
<property name="blockWhenExhausted" value="${redis.blockWhenExhausted}"/>
<!--空闲连接的检测周期-->
<property name="timeBetweenEvictionRunsMillis" value="${redis.timeBetweenEvictionRunsMillis}"/>
<!--当连接给调用者使用时,是否检测空间超时-->
<property name="testWhileIdle" value="${redis.testWhileIdle}"/>
</bean>
<bean id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
destroy-method="destroy">
<property name="hostName" value="${redis.host}"/>
<property name="port" value="${redis.port}"/>
<property name="password" value="${redis.password}"/>
<property name="usePool" value="${redis.usePool}"/>
<property name="poolConfig" ref="jedisPoolConfig"/>
</bean>
<!--RedisTemplate:可操作对象,最终会将对象转化为字节(所以对象需支持序列化和反序列化)-->
<!--StringRedisTemplate:操作对象是String类型-->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<!--开启事务支持-->
<property name="enableTransactionSupport" value="true"/>
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<!-- 序列化方式 建议key/hashKey采用StringRedisSerializer。 -->
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashValueSerializer">
<bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
</property>
</bean>
</beans>
redis.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#一个pool可分配多少个jedis实例
redis.maxActive=2048
#一个pool最多可有多少个状态为idle(空闲)的jedis实例
redis.maxIdle=300
#当borrow一个jedis实例时,最大的等待时间,如果超过了等待时间,则直接抛出JedisConnectionException
redis.maxWaitMillis=10000
#在borrow一个jedis实例时,是否提前进行validate操作,如果为true,则得到的jedis实例时可用的
redis.testOnBorrow=true
#连接耗尽时是否阻塞,false报异常,true阻塞直到超时,默认为true
redis.testOnReturn=false
redis.blockWhenExhausted=true
redis.testWhileIdle=false
redis.timeBetweenEvictionRunsMillis=60000
redis.host=127.0.0.1
redis.port=6379
redis.password=huweitech
redis.usePool=true
Java Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.huweitech.redis.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Created by jarvan4dev on 2017/3/11.
*/
@ContextConfiguration(locations = "classpath:spring-context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class RedisTest extends AbstractJUnit4SpringContextTests{
private static final String myWatchKey = "myWatchKey";
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Test
public void testWatch() throws InterruptedException {
System.out.println(redisTemplate.getDefaultSerializer());
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
valueOperations.set(myWatchKey, "10");
ExecutorService executor = Executors.newFixedThreadPool(200);
for (int i=0; i< 10000; i++) {
executor.execute(new SecKillRunnable(redisTemplate));
}
executor.awaitTermination(30, TimeUnit.SECONDS);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.huweitech.redis.test;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import java.util.List;
/**
* Created by jarvan4dev on 2017/3/16.
*/
public class SecKillRunnable implements Runnable{
private static final String myWatchKey = "myWatchKey";
private RedisTemplate<String, String> redisTemplate;
private ValueOperations<String, String> valueOperations;
public SecKillRunnable(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
this.valueOperations = redisTemplate.opsForValue();
}
@Override
public void run() {
System.out.println("线程执行");
redisTemplate.watch(myWatchKey);
Integer count = Integer.parseInt(valueOperations.get(myWatchKey));
System.out.println("当前剩余数量:" + count);
if (count > 0) {
System.out.println("进入");
redisTemplate.multi();
valueOperations.increment(myWatchKey, -1);
List<Object> list = redisTemplate.exec();
if (list != null) {
System.out.println("秒杀成功");
} else {
System.out.println("秒杀失败");
}
} else {
System.out.println("秒杀结束了");
}
}
}

关于spring-data-redis的RedisSerializer,请参看下面的第二篇参考链接。

参考文章

  1. http://lib.csdn.net/article/redis/18162
  2. http://blog.csdn.net/wangjun5159/article/details/52387782
我知道是不会有人点的,但万一有人想不开呢!