Redis:7.2.4 SpringBoot-redis-data-starter:3.2.3

I used RedisTemplate to implement a blocking queue and used RedisTemplate.execute() to execute script commands. However, when I create a thread in the test method and call it, Caused by: io.lettuce.core.RedisException: Connection will appear. is closed, I don’t know what’s going on, please help.

application.yaml

spring:
  data:
    redis:
      # redis server主机地址
      host: 192.168.198.131
      # redis server端口
      port: 6379
      # redis server username
      username:
      # redis server password
      password:
      # redis 连接超时时间
      timeout: 5000
      # redis lettuce客户端配置,使用lettuce需要添加commons-pool2依赖,lettuce连接池基于commons-pool2
      lettuce:
        # lettuce 连接池配置
        pool:
          # lettuce连接池最大连接数,默认8
          max-active: 8
          # lettuce连接池最大空闲连接数,默认0
          max-idle: 8
          # lettuce连接池最小空闲连接数,默认0
          min-idle: 1
          # # lettuce连接池最大等待时间(单位毫秒),默认-1ms
          max-wait: 1000
#      jedis:
#        pool:
#          # pool连接池最大连接数,默认8
#          max-active: 8
#          # pool连接池最大空闲连接数,默认0
#          max-idle: 0
#          # pool连接池最小空闲连接数,默认0
#          min-idle: 0
#          # pool连接池最大等待时间(单位毫秒),默认-1ms
#          max-wait: -1

RedisConfiguration:

package com.fly.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.*;

/**
 * Redis配置类
 * @author zchengfeng
 */
@Configuration
public class RedisConfiguration {

    /**
     * RedisTemplate是SpringBoot Redis整合包提供的用于操作Redis的模板类,提供了一列类操作
     * Redis的API,默认采用JdkSerialization进行序列化,set后的数据实际上存储的是
     * 二进制字节码,可读性非常差,通过自定义RedisTemplate
     *
     * @param factory redis连接工厂
     * @return RedisTemplate
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();


        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
        // Jackson2Json序列化
        GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer =new GenericJackson2JsonRedisSerializer(objectMapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        /**
         * 设置key采用String序列化方式,
         * Redis存取默认使用JdkSerializationRedisSerializer序列化,
         * 这种序列化会key的前缀添加奇怪的字符,例如\xac\xed\x00\x05t\x00user_id,
         * 使用StringRedisSerializer序列化可以去掉这种字符
         */
        template.setKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 设置连接工厂
        template.setConnectionFactory(factory);
        template.afterPropertiesSet();
        /*
         * 开启Redis事务,默认是关闭的。也可以手动开启事务,
         * 通过template.multi()开启事务,template.exec()关闭事务
         */
//        template.setEnableTransactionSupport(false);
        return template;
    }
}

queue.class:

package com.fly.structure.list;

import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.NoSuchElementException;

/**
 * @Description 分布式阻塞队列
 * @Author zchengfeng
 * @Date 2024/2/26 01:07:29
 */
@Component
public class DistributedBlockingQueue<E> {
    private final RedisTemplate<String, Object> redisTemplate;
    private ListOperations<String, Object> listOperations;
    private final static String PREFIX = "blockingQueue::";
    /**
     * 队列名称,可能存在多个队列,使用队列名称区分,队列名称也会作为Redis List结构的key
     */
    private String queueName;

    public DistributedBlockingQueue(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void setQueueName(String name) {
        this.queueName = PREFIX + name;
    }


    /**
     * 获取ListOperations实例用于List结构,避免多次调用opsForList()
     * 获取ListOperations实例
     *
     * @return ListOperations实例
     */
    public ListOperations<String, Object> getListOperations() {
        listOperations = listOperations == null ? redisTemplate.opsForList() : listOperations;
        return listOperations;
    }

    /**
     * 检查队列名是否为null,为null将抛出NullPointerException
     */
    public void checkQueueName() {
        if (queueName == null) {
            throw new NullPointerException("queueName is empty");
        }
    }

    /**
     * 获取队列的长度
     *
     * @return 队列的长度
     */
    public Long size() {
        checkQueueName();
        return getListOperations().size(queueName);
    }

    /**
     * 向队列尾部添加元素(入队),添加成功返回true,否则返回false
     *
     * @param e 被添加元素
     * @return 添加结果
     */
    public boolean add(E e) {
        checkQueueName();
        getListOperations().leftPush(queueName, e);
//        try {
//            getListOperations().leftPush(queueName, e);
//        } catch (Exception ex) {
//            System.out.println("ex:" + ex);
//            return false;
//        }
        return true;
    }


    /**
     * 从队列头部删除元素(出队),remove与poll()的区别在于,此方法不是阻塞式的,如果此队列为空,则抛出异常。
     *
     * @return 被删除的元素
     */
    public E remove() {
        if (size() == 0) {
            throw new NoSuchElementException();
        }
        return (E) getListOperations().rightPop(queueName, 1);
    }

    /**
     * 删除队列第一个元素并返回该元素,如果队列为空,则阻塞等待直到队列有元素。
     *
     * @return 返回队列的头部元素
     */
    public E poll() {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            String command = "BRPOP";
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            byte[] timeout = "0".getBytes();
            return conn.execute(command, keyArg, timeout);
        });
        return (E) element;
    }


    /**
     * 删除队列第一个元素并返回该元素,如果队列为空,则阻塞等待直到队列有元素。
     *
     * @param timeout 阻塞的超时时间,单位ms
     * @return 返回队列的头部元素
     */
    public E poll(Long timeout) {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
            buffer.putLong(timeout);
            // 执行 BRPOP KEY timeout 命令
            return conn.execute("BRPOP", keyArg, buffer.array());
        });
        return (E) element;
    }

    /**
     * 删除队列最后一个元素并返回该元素,如果此队列为空,则阻塞等待直到队列有元素。
     *
     * @return 返回队列的尾部元素
     */
    public E peek() {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            return conn.execute("BLPOP", keyArg);
        });
        return (E) element;
    }

    /**
     * 删除队列最后一个元素并返回该元素,如果此队列为空,则阻塞等待直到队列有元素。
     *
     * @param timeout 阻塞的超时时间,单位ms
     * @return 返回队列的尾部元素
     */
    public E peek(long timeout) {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
            buffer.putLong(timeout);
            // 执行 BLPOP KEY timeout 命令
            return conn.execute("BLPOP", keyArg, buffer.array());
        });
        return (E) element;
    }

    /**
     * 获取队列中所有元素。range start stop命令用于获取指定范围的元素,
     * 当stop为-1时表示获取至队列末尾。
     *
     * @return 队列中所有元素
     */
    public List<E> getItems() {
        checkQueueName();
        return (List<E>) getListOperations().range(queueName, 0, -1);
    }

    /**
     * 清除队列中所有元素。LTRIM key start stop用于修剪list中start至stop的元素,
     * 如果start大于stop就可以达到清除所有元素的效果
     */
    public void clear() {
        checkQueueName();
        getListOperations().trim(queueName, 1, 0);
    }
}

test.class:

package com.fly.structure.list;

import com.fly.RedisApplication;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @Description DistributedBlockingQueue测试类
 * @Author zchengfeng
 * @Date 2024/2/26 01:32:37
 */
@SpringBootTest(classes = RedisApplication.class)
public class DistributedBlockingQueueTest {
    @Autowired
    DistributedBlockingQueue<String> queue;

    @Test
    public void blockingQueueTest() {
        queue.setQueueName("bQueue");
        System.out.println("xxx:" + queue.add("item1")); // OK
        // 队列消费者
        new Thread(() -> {
            System.out.println("xxx:" + queue.add("item1")); // ERROR,Connection is closed
        }).start();
    }
}

Connection is closed when used internally in Thread, but it is normal when used externally.

Comment From: wilkinsona

Spring Boot isn't involved at this level of Spring Data Redis and Lettuce's behavior. If you're looking for some help, Stack Overflow is a better place to ask. Or, if you believe you have found a bug, please open a Spring Data Redis issue.