Redis:7.2.4 SpringBoot-redis-data-starter:3.2.3
I used RedisTemplate to implement a blocking queue and used RedisTemplate.execute() to execute script commands. However, when I create a thread in the test method and call it, Caused by: io.lettuce.core.RedisException: Connection will appear. is closed, I don’t know what’s going on, please help.
application.yaml
spring:
data:
redis:
# redis server主机地址
host: 192.168.198.131
# redis server端口
port: 6379
# redis server username
username:
# redis server password
password:
# redis 连接超时时间
timeout: 5000
# redis lettuce客户端配置,使用lettuce需要添加commons-pool2依赖,lettuce连接池基于commons-pool2
lettuce:
# lettuce 连接池配置
pool:
# lettuce连接池最大连接数,默认8
max-active: 8
# lettuce连接池最大空闲连接数,默认0
max-idle: 8
# lettuce连接池最小空闲连接数,默认0
min-idle: 1
# # lettuce连接池最大等待时间(单位毫秒),默认-1ms
max-wait: 1000
# jedis:
# pool:
# # pool连接池最大连接数,默认8
# max-active: 8
# # pool连接池最大空闲连接数,默认0
# max-idle: 0
# # pool连接池最小空闲连接数,默认0
# min-idle: 0
# # pool连接池最大等待时间(单位毫秒),默认-1ms
# max-wait: -1
RedisConfiguration:
package com.fly.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.*;
/**
* Redis配置类
* @author zchengfeng
*/
@Configuration
public class RedisConfiguration {
/**
* RedisTemplate是SpringBoot Redis整合包提供的用于操作Redis的模板类,提供了一列类操作
* Redis的API,默认采用JdkSerialization进行序列化,set后的数据实际上存储的是
* 二进制字节码,可读性非常差,通过自定义RedisTemplate
*
* @param factory redis连接工厂
* @return RedisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
// Jackson2Json序列化
GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer =new GenericJackson2JsonRedisSerializer(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
/**
* 设置key采用String序列化方式,
* Redis存取默认使用JdkSerializationRedisSerializer序列化,
* 这种序列化会key的前缀添加奇怪的字符,例如\xac\xed\x00\x05t\x00user_id,
* 使用StringRedisSerializer序列化可以去掉这种字符
*/
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
// 设置连接工厂
template.setConnectionFactory(factory);
template.afterPropertiesSet();
/*
* 开启Redis事务,默认是关闭的。也可以手动开启事务,
* 通过template.multi()开启事务,template.exec()关闭事务
*/
// template.setEnableTransactionSupport(false);
return template;
}
}
queue.class:
package com.fly.structure.list;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.NoSuchElementException;
/**
* @Description 分布式阻塞队列
* @Author zchengfeng
* @Date 2024/2/26 01:07:29
*/
@Component
public class DistributedBlockingQueue<E> {
private final RedisTemplate<String, Object> redisTemplate;
private ListOperations<String, Object> listOperations;
private final static String PREFIX = "blockingQueue::";
/**
* 队列名称,可能存在多个队列,使用队列名称区分,队列名称也会作为Redis List结构的key
*/
private String queueName;
public DistributedBlockingQueue(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void setQueueName(String name) {
this.queueName = PREFIX + name;
}
/**
* 获取ListOperations实例用于List结构,避免多次调用opsForList()
* 获取ListOperations实例
*
* @return ListOperations实例
*/
public ListOperations<String, Object> getListOperations() {
listOperations = listOperations == null ? redisTemplate.opsForList() : listOperations;
return listOperations;
}
/**
* 检查队列名是否为null,为null将抛出NullPointerException
*/
public void checkQueueName() {
if (queueName == null) {
throw new NullPointerException("queueName is empty");
}
}
/**
* 获取队列的长度
*
* @return 队列的长度
*/
public Long size() {
checkQueueName();
return getListOperations().size(queueName);
}
/**
* 向队列尾部添加元素(入队),添加成功返回true,否则返回false
*
* @param e 被添加元素
* @return 添加结果
*/
public boolean add(E e) {
checkQueueName();
getListOperations().leftPush(queueName, e);
// try {
// getListOperations().leftPush(queueName, e);
// } catch (Exception ex) {
// System.out.println("ex:" + ex);
// return false;
// }
return true;
}
/**
* 从队列头部删除元素(出队),remove与poll()的区别在于,此方法不是阻塞式的,如果此队列为空,则抛出异常。
*
* @return 被删除的元素
*/
public E remove() {
if (size() == 0) {
throw new NoSuchElementException();
}
return (E) getListOperations().rightPop(queueName, 1);
}
/**
* 删除队列第一个元素并返回该元素,如果队列为空,则阻塞等待直到队列有元素。
*
* @return 返回队列的头部元素
*/
public E poll() {
// 执行redis命令获取结果
final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
String command = "BRPOP";
byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
byte[] timeout = "0".getBytes();
return conn.execute(command, keyArg, timeout);
});
return (E) element;
}
/**
* 删除队列第一个元素并返回该元素,如果队列为空,则阻塞等待直到队列有元素。
*
* @param timeout 阻塞的超时时间,单位ms
* @return 返回队列的头部元素
*/
public E poll(Long timeout) {
// 执行redis命令获取结果
final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(timeout);
// 执行 BRPOP KEY timeout 命令
return conn.execute("BRPOP", keyArg, buffer.array());
});
return (E) element;
}
/**
* 删除队列最后一个元素并返回该元素,如果此队列为空,则阻塞等待直到队列有元素。
*
* @return 返回队列的尾部元素
*/
public E peek() {
// 执行redis命令获取结果
final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
return conn.execute("BLPOP", keyArg);
});
return (E) element;
}
/**
* 删除队列最后一个元素并返回该元素,如果此队列为空,则阻塞等待直到队列有元素。
*
* @param timeout 阻塞的超时时间,单位ms
* @return 返回队列的尾部元素
*/
public E peek(long timeout) {
// 执行redis命令获取结果
final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(timeout);
// 执行 BLPOP KEY timeout 命令
return conn.execute("BLPOP", keyArg, buffer.array());
});
return (E) element;
}
/**
* 获取队列中所有元素。range start stop命令用于获取指定范围的元素,
* 当stop为-1时表示获取至队列末尾。
*
* @return 队列中所有元素
*/
public List<E> getItems() {
checkQueueName();
return (List<E>) getListOperations().range(queueName, 0, -1);
}
/**
* 清除队列中所有元素。LTRIM key start stop用于修剪list中start至stop的元素,
* 如果start大于stop就可以达到清除所有元素的效果
*/
public void clear() {
checkQueueName();
getListOperations().trim(queueName, 1, 0);
}
}
test.class:
package com.fly.structure.list;
import com.fly.RedisApplication;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @Description DistributedBlockingQueue测试类
* @Author zchengfeng
* @Date 2024/2/26 01:32:37
*/
@SpringBootTest(classes = RedisApplication.class)
public class DistributedBlockingQueueTest {
@Autowired
DistributedBlockingQueue<String> queue;
@Test
public void blockingQueueTest() {
queue.setQueueName("bQueue");
System.out.println("xxx:" + queue.add("item1")); // OK
// 队列消费者
new Thread(() -> {
System.out.println("xxx:" + queue.add("item1")); // ERROR,Connection is closed
}).start();
}
}
Connection is closed when used internally in Thread, but it is normal when used externally.
Comment From: wilkinsona
Spring Boot isn't involved at this level of Spring Data Redis and Lettuce's behavior. If you're looking for some help, Stack Overflow is a better place to ask. Or, if you believe you have found a bug, please open a Spring Data Redis issue.