glmapper

SpringSession系列-存储机制之Redis&Map

字数统计: 2.4k阅读时长: 10 min
2018/12/15 Share

@TOC
在之前的文章中已经对SpringSession的功能结构,请求/响应重写等做了介绍。本文将继续来介绍下SpringSession中存储部分的设计。存储是分布式session中算是最核心的部分,通过引入三方的存储容器来实现session的存储,从而有效的解决session共享的问题。

1、SpringSession存储的顶级抽象接口

SpringSession存储的顶级抽象接口是org.springframework.session包下的SessionRepository这个接口。SessionRepository的类图结构如下:

这里先来看下SessionRepository这个顶层接口中定义了哪些方法:

1
2
3
4
5
6
7
8
9
10
public interface SessionRepository<S extends Session> {
//创建一个session
S createSession();
//保存session
void save(S session);
//通过ID查找session
S findById(String id);
//通过ID删除一个session
void deleteById(String id);
}

从代码来看还是很简单的,就是增删查。下面看具体实现。在2.0版本开始SpringSession中也提供了一个和SessionRepository具体相同能力的ReactiveSessionRepository,用于支持响应式编程模式。

2、MapSessionRepository

基于HashMap实现的基于内存存储的存储器实现,这里就主要看下对于接口中几个方法的实现。

1
2
3
4
5
public class MapSessionRepository implements SessionRepository<MapSession> {
private Integer defaultMaxInactiveInterval;
private final Map<String, Session> sessions;
//...
}

可以看到就是一个Map,那后面关于增删查其实就是操作这个Map了。

createSession

1
2
3
4
5
6
7
8
9
@Override
public MapSession createSession() {
MapSession result = new MapSession();
if (this.defaultMaxInactiveInterval != null) {
result.setMaxInactiveInterval(
Duration.ofSeconds(this.defaultMaxInactiveInterval));
}
return result;
}

这里很直接,就是new了一个MapSession,然后设置了session的有效期。

save

1
2
3
4
5
6
7
@Override
public void save(MapSession session) {
if (!session.getId().equals(session.getOriginalId())) {
this.sessions.remove(session.getOriginalId());
}
this.sessions.put(session.getId(), new MapSession(session));
}

这里面先判断了session中的两个ID,一个originalId,一个当前idoriginalId是第一次生成session对象时创建的,后面都不会在变化。通过源码来看,对于originalId,只提供了get方法。对于id呢,其实是可以通过changeSessionId来改变的。

这里的这个操作实际上是一种优化行为,及时的清除掉老的session数据来释放内存空间。

findById

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public MapSession findById(String id) {
Session saved = this.sessions.get(id);
if (saved == null) {
return null;
}
if (saved.isExpired()) {
deleteById(saved.getId());
return null;
}
return new MapSession(saved);
}

这个逻辑也很简单,先从Map中根据id取出session数据,如果没有就返回null,如果有则再判断下是否过期了,如果过期了就删除掉,然后返回null。如果查到了,并且没有过期的话,则构建一个MapSession返回。

OK,基于内存存储的实现系列就是这些了,下面继续来看其他存储的实现。

3、FindByIndexNameSessionRepository

FindByIndexNameSessionRepository继承了SessionRepository接口,用于扩展对第三方存储的实现。

1
2
3
4
5
6
7
8
9
10
11
12
public interface FindByIndexNameSessionRepository<S extends Session>
extends SessionRepository<S> {

String PRINCIPAL_NAME_INDEX_NAME = FindByIndexNameSessionRepository.class.getName()
.concat(".PRINCIPAL_NAME_INDEX_NAME");

Map<String, S> findByIndexNameAndIndexValue(String indexName, String indexValue);

default Map<String, S> findByPrincipalName(String principalName) {
return findByIndexNameAndIndexValue(PRINCIPAL_NAME_INDEX_NAME, principalName);
}
}

FindByIndexNameSessionRepository添加一个单独的方法为指定用户查询所有会话。这是通过设置名为FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAMESession的属性值为指定用户的username来完成的。开发人员有责任确保属性被赋值,因为SpringSession不会在意被使用的认证机制。官方文档中给出的例子如下:

1
2
3
String username = "username";
this.session.setAttribute(
FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME, username);

FindByIndexNameSessionRepository的一些实现会提供一些钩子自动的索引其他的session属性。比如,很多实现都会自动的确保当前的Spring Security用户名称可通过索引名称FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME进行索引。一旦会话被索引,就可以通过下面的代码检索:

1
2
3
4
String username = "username";
Map<String, Session> sessionIdToSession =
this.sessionRepository.findByIndexNameAndIndexValue(
FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME,username);

下图是FindByIndexNameSessionRepository接口的三个实现类:
FindByIndexNameSessionRepository

下面来分别分析下这三个存储的实现细节。

3.1 RedisOperationsSessionRepository

RedisOperationsSessionRepository的类图结构如下,MessageListenerredis消息订阅的监听接口。
在这里插入图片描述

代码有点长,就不在这里面贴了,一些注释可以在这个 SpringSession中文分支 来看。这里还是主要来看下对于那几个方法的实现。

3.1.1 createSession

这里和MapSessionRepository的实现基本一样的,那区别就在于Session的封装模型不一样,这里是RedisSession,实际上RedisSession的实现是对MapSession又包了一层。下面会分析RedisSession这个类。

1
2
3
4
5
6
7
8
9
10
@Override
public RedisSession createSession() {
// RedisSession,这里和MapSession区别开
RedisSession redisSession = new RedisSession();
if (this.defaultMaxInactiveInterval != null) {
redisSession.setMaxInactiveInterval(
Duration.ofSeconds(this.defaultMaxInactiveInterval));
}
return redisSession;
}

在看其他两个方法之前,先来看下RedisSession这个类。

3.1.2 RedisSession

这个在模型上是对MapSession的扩展,增加了delta这个东西。

1
2
3
4
5
6
7
8
9
10
11
12
final class RedisSession implements Session {
// MapSession 实例对象,主要存数据的地方
private final MapSession cached;
// 原始最后访问时间
private Instant originalLastAccessTime;
private Map<String, Object> delta = new HashMap<>();
// 是否是新的session对象
private boolean isNew;
// 原始主名称
private String originalPrincipalName;
// 原始sessionId
private String originalSessionId;

delta是一个Map结构,那么这里面到底是放什么的呢?具体细节见 saveDelta 这个方法。saveDelta 这个方法会在两个地方被调用,一个是下面要说道的save方法,另外一个是 flushImmediateIfNecessary 这个方法:

1
2
3
4
5
private void flushImmediateIfNecessary() {
if (RedisOperationsSessionRepository.this.redisFlushMode == RedisFlushMode.IMMEDIATE) {
saveDelta();
}
}

RedisFlushMode提供了两种推送模式:

  • ON_SAVE:只有在调用save方法时执行,在web环境中这样做通常是尽快提交HTTP响应
  • IMMEDIATE:只要有变更就会直接写到redis中,不会像ON_SAVE一样,在最后commit时一次性写入

追踪flushImmediateIfNecessary 方法调用链如下:
在这里插入图片描述
那么到这里基本就清楚了,首先save这个方法,当主动调用save时就是将数据推到redis中去的,也就是ON_SAVE这种情况。那么对于IMMEDIATE这种情况,只有调用了上面的四个方法,SpringSession 才会将数据推送到redis

所以delta里面存的是当前一些变更的 key-val 键值对象,而这些变更是由setAttributeremoveAttributesetMaxInactiveIntervalInSecondssetLastAccessedTime这四个方法触发的;比如setAttribute(k,v),那么这个k->v就会被保存到delta里面。

3.1.3 save

在理解了saveDelta方法之后再来看save方法就简单多了。save 对应的就是RedisFlushMode.ON_SAVE

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void save(RedisSession session) {
// 直接调用 saveDelta推数据到redis
session.saveDelta();
if (session.isNew()) {
// sessionCreatedKey->channl
String sessionCreatedKey = getSessionCreatedChannel(session.getId());
// 发布一个消息事件,新增 session,以供 MessageListener 回调处理。
this.sessionRedisOperations.convertAndSend(sessionCreatedKey, session.delta);
session.setNew(false);
}
}

3.1.4 findById

查询这部分和基于Map的差别比较大,因为这里并不是直接操作Map,而是与Redis 进行一次交互。

1
2
3
4
@Override
public RedisSession findById(String id) {
return getSession(id, false);
}

调用getSession方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private RedisSession getSession(String id, boolean allowExpired) {
// 根据ID从redis中取出数据
Map<Object, Object> entries = getSessionBoundHashOperations(id).entries();
if (entries.isEmpty()) {
return null;
}
//转换成MapSession
MapSession loaded = loadSession(id, entries);
if (!allowExpired && loaded.isExpired()) {
return null;
}
//转换成RedisSession
RedisSession result = new RedisSession(loaded);
result.originalLastAccessTime = loaded.getLastAccessedTime();
return result;
}

loadSession中构建MapSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private MapSession loadSession(String id, Map<Object, Object> entries) {
// 生成MapSession实例
MapSession loaded = new MapSession(id);
//遍历数据
for (Map.Entry<Object, Object> entry : entries.entrySet()) {
String key = (String) entry.getKey();
if (CREATION_TIME_ATTR.equals(key)) {
// 设置创建时间
loaded.setCreationTime(Instant.ofEpochMilli((long) entry.getValue()));
}
else if (MAX_INACTIVE_ATTR.equals(key)) {
// 设置最大有效时间
loaded.setMaxInactiveInterval(Duration.ofSeconds((int) entry.getValue()));
}
else if (LAST_ACCESSED_ATTR.equals(key)) {
// 设置最后访问时间
loaded.setLastAccessedTime(Instant.ofEpochMilli((long) entry.getValue()));
}
else if (key.startsWith(SESSION_ATTR_PREFIX)) {
// 设置属性
loaded.setAttribute(key.substring(SESSION_ATTR_PREFIX.length()),
entry.getValue());
}
}
return loaded;
}

3.1.5 deleteById

根据sessionId删除session数据。具体过程看代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void deleteById(String sessionId) {
// 获取 RedisSession
RedisSession session = getSession(sessionId, true);
if (session == null) {
return;
}
// 清楚当前session数据的索引
cleanupPrincipalIndex(session);
//执行删除操作
this.expirationPolicy.onDelete(session);
String expireKey = getExpiredKey(session.getId());
//删除expireKey
this.sessionRedisOperations.delete(expireKey);
//session有效期设置为0
session.setMaxInactiveInterval(Duration.ZERO);
save(session);
}

3.1.6 onMessage

最后来看下这个订阅回调处理。这里看下核心的一段逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
boolean isDeleted = channel.equals(this.sessionDeletedChannel);
// Deleted 还是 Expired ?
if (isDeleted || channel.equals(this.sessionExpiredChannel)) {
// 此处省略无关代码
// Deleted
if (isDeleted) {
// 发布一个 SessionDeletedEvent 事件
handleDeleted(session);
}
// Expired
else {
// 发布一个 SessionExpiredEvent 事件
handleExpired(session);
}
}

3.2 Redis 存储的一些思考

首先按照我们自己常规的思路来设计的话,我们会怎么来考虑这个事情。这里首先要声明下,我对 Redis 这个东西不是很熟,没有做过深入的研究;那如果是我来做,可能也就仅仅限于存储。

  • findByIndexNameAndIndexValue的设计,这个的作用是通过indexNameindexValue来返回当前用户的所有会话。但是这里需要考虑的一个事情是,通常情况下,一个用户只会关联到一个会话上面去,那这种设计很显然,我的理解是为了支持单用户多会话的场景。
    • indexName:FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME
    • indexValue:username
  • 实现 MessageListener 接口,增加事件通知能力。通过监听这些事件,可以做一些session操作管控。但是实际上 SpringSession 中并没有做任何事情,从代码来看,publishEvent方法是空实现。等待回复中 #issue 1287

    1
    2
    3
    4
    5
    6
    7
    8
    private ApplicationEventPublisher eventPublisher = new ApplicationEventPublisher() {
    @Override
    public void publishEvent(ApplicationEvent event) {
    }
    @Override
    public void publishEvent(Object event) {
    }
    };
  • RedisFlushModeSpringSession中提供了两种模式的推送,一种是ON_SAVE,另外一种是IMMEDIATE。默认是ON_SAVE,也就是常规的在请求处理结束时进行一次sessionCommit操作。RedisFlushMode 的设计感觉是为session数据持久化的时机提供了另外一种思路。

小结

存储机制设计部分就一基于内存和基于Redis两种来分析;另外基于jdbchazelcast有兴趣的同学可以自己查看源码。

最后也欢迎访问我的个人博客:www.glmapper.com

参考

原文作者:GuoLei Song

原文链接:http://www.glmapper.com/2018/12/15/spring-session-redis-map/

发表日期:December 15th 2018, 5:30:38 pm

更新日期:December 22nd 2018, 4:10:10 pm

版权声明:转载请注明出处

CATALOG
  1. 1. 1、SpringSession存储的顶级抽象接口
  2. 2. 2、MapSessionRepository
    1. 2.1. createSession
    2. 2.2. save
    3. 2.3. findById
  3. 3. 3、FindByIndexNameSessionRepository
    1. 3.1. 3.1 RedisOperationsSessionRepository
      1. 3.1.1. 3.1.1 createSession
      2. 3.1.2. 3.1.2 RedisSession
      3. 3.1.3. 3.1.3 save
      4. 3.1.4. 3.1.4 findById
      5. 3.1.5. 3.1.5 deleteById
      6. 3.1.6. 3.1.6 onMessage
    2. 3.2. 3.2 Redis 存储的一些思考
  4. 4. 小结
  5. 5. 参考