[Database] Spring Data Redis에서 Pipeline 활용하기

2025. 12. 29. 17:30DataBase

 

 

이번 포스팅은 Redis 파이프라인에 대해서 알아보고 사용해 보는 과정을 포스팅하겠습니다.

 

redis pipeline이란?

우선 redis는 클라이언트와 TCP/IP 소켓 통신을 기반으로 통신하며 기본적으로 요청을 보낼 때 아래와 같이 수행합니다. 

요청 -> 응답, 요청 -> 응답 (네트워크를 계속 왔다 갔다 함)

 

3-way handshake를 수행하면서 수신자가 받을 준비가 되어있는지, 송신자가 보낼 준비가 되어있는지를 체크합니다.

 

이 체크가 모두 완료 되고나서야 실제로 패킷이 전송되며 통신이 이루어집니다. 이러한 체크과정이 TCP 통신에선 필수로 포함되어야하기 때문에 다량의 요청을 단발성으로 여러번 보내게 되면 그만큼의 통신 체크 과정도 여러번 수행하므로 그만큼의 오버헤드가 발생합니다.

 

따라서 다량의 redis 명령어를 실행시킬때에는 송신지부터 목적지 호스트까지 패킷이 왕복하는게 걸리는 시간을 의미하는 RTT(Round Trip Time)가 매우 늘어날 수 있습니다.  

 

이러한 상황을 개선하기 위해 pipeline을 이용합니다.

 

먼저 Redis 클라이언트가 Redis 서버에 연결합니다. 이 때 클라이언트와 서버 간의 TCP/IP 연결이 설정됩니다. 

그 후 클라이언트는 파이프라인을 시작합니다. 이때, 여러 개의 Redis 명령을 파이프라인에 추가할 수 있습니다. 

파이프라인에 추가된 명령들은 클라이언트가 EXECUTE 명령을 호출하기 전까지 서버로 전송되지 않습니다. 

클라이언트가 EXECUTE 명령을 호출하면 파이프라인에 추가된 모든 명령이 일괄로 Redis 서버로 전송됩니다. 

Redis 서버는 파이프라인에 추가된 모든 명령을 받아 처리합니다. 이 때 명령들은 순서대로 처리됩니다.

클라이언트가 수신하는 파이프라인 실행 결과도 명령이 실행된 순서대로 반환됩니다. 

 

TCP/IP 통신을 잘 모르겠다면?

https://aws-hyoh.tistory.com/57

 

TCP/IP 쉽게 이해하기

IT 분야에서 실무를 담당하시는 분들뿐만 아니라 학생, IT 쪽에 조금이라도 관심이 있는 분들이라면 TCP/IP에 대해 들어보셨을 겁니다. 저 또한 학부시절에 TCP/IP에 대해서 여러 번 들어보았는데요.

aws-hyoh.tistory.com

 

redis pipelining 공식 문서

https://redis.io/docs/latest/develop/using-commands/pipelining/

 

Redis pipelining

How to optimize round-trip times by batching Redis commands

redis.io

 

코드

build.gradle 파일에 spring-data-redis 라이브러리를 추가합니다.

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-data-redis'
}

 

application.yml에는 redis의 host, port를 명세합니다.

spring:
  application:
    name: RedisPipeline

  data:
    redis:
      host: localhost
      port: 6379

 

앞서 추가한 라이브러리를 이용해 redis를 사용하기 위해서는 configuration 파일을 작성합니다.

applicatoin.yml에서 명세해두었던 host와 port를 변수로 선언하고 redisTemplate 메서드를 작성합니다.  

@Configuration
public class RedisConfig {
    @Value("${spring.data.redis.host}")
    private String host;

    @Value("${spring.data.redis.port}")
    private int port;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(host, port);
    }

    @Bean
    public RedisTemplate<?, ?> redisTemplate() {
        RedisTemplate<?, ?> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        return redisTemplate;
    }
}

 

실제 실습하는 부분 코드가 들어갈 클래스를 작성합니다. 

아래는 전체 코드이며 한 부분씩 자세하게 살펴보겠습니다.

@Service
public class RedisPipelineService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    // 비교군: 파이프라인 없이 10,000개 입력
    public void insertWithoutPipeline() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        for (int i = 0; i < 10000; i++) {
            // 매번 네트워크를 타고 Redis에 갔다 옴 (RTT 10,000번 발생)
            redisTemplate.opsForValue().set("normal:" + i, "value" + i);
        }

        stopWatch.stop();
        System.out.println("일반 방식 소요 시간: " + stopWatch.getTotalTimeSeconds() + "초");
    }

    // 파이프라인을 사용한 10,000개 입력
    public void insertWithPipeline() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        /* executePipelined 메서드가 파이프라인의 시작입니다.
           이 메서드는 내부적으로 Redis 연결을 하나 잡고,
           명령어들을 바로 실행하지 않고 큐(Queue)에 쌓아둡니다.
        */
        List<Object> results = redisTemplate.executePipelined(
                new SessionCallback<Object>() {
                    @Override
                    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {

                        // 여기서 operations는 파이프라인 모드로 동작하는 프록시 객체입니다.
                        for (int i = 0; i < 10000; i++) {
                            String key = "pipeline:" + i;
                            String value = "value" + i;

                            // 이 명령어는 즉시 실행되지 않고 버퍼에 쌓입니다.
                            // 따라서, 실제 Redis에는 아직 아무것도 전송되지 않았습니다.
                            operations.opsForValue().set((K) key, (V) value);
                        }

                        // [핵심 포인트] 왜 null을 반환해야 할까요?
                        // 파이프라인 내부에서는 개별 명령어의 응답을 바로 받을 수 없습니다.
                        // 모든 응답은 나중에 List<Object> results로 한꺼번에 돌아옵니다.
                        // 따라서 여기서는 반환할 값이 없으므로 null을 리턴합니다.
                        return null;
                    }
                }
        );

        stopWatch.stop();
        System.out.println("파이프라인 방식 소요 시간: " + stopWatch.getTotalTimeSeconds() + "초");
    }
}

 

앞서 configuration 파일에서 bean으로 설정했던 redisTemplate을 <String,String>타입으로 의존성을 주입합니다.

@Autowired
private RedisTemplate<String, String> redisTemplate;

 

아래는 redis pipeline을 사용하지 않고 명령어를 수행하는 메서드입니다.

추후에 소요되는 시간을 측정하기 위해서 Stopwatch 객체를 생성해서 사용합니다.

for문을 이용해 10000번의 set 명령어를 수행합니다. 

// 비교군: 파이프라인 없이 10,000개 입력
    public void insertWithoutPipeline() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        for (int i = 0; i < 10000; i++) {
            // 매번 네트워크를 타고 Redis에 갔다 옴 (RTT 10,000번 발생)
            redisTemplate.opsForValue().set("normal:" + i, "value" + i);
        }

        stopWatch.stop();
        System.out.println("일반 방식 소요 시간: " + stopWatch.getTotalTimeSeconds() + "초");
    }

 

다음은 redis pipeline을 사용해서 명령어를 수행하는 메서드입니다.

pipeline을 사용하기 위해서는 executePipelined() 메서드를 호출합니다.

executePipelined()메서드의 파라미터로 SessionCallback이라는 인터페이스를 즉석에서 구현한 객체를 넘겨줍니다.

별도의 클래스 파일(ex. MyCallback.java)을 만들지 않고 new로 선언해 생성합니다.

List<Object> results = redisTemplate.executePipelined(
                new SessionCallback<Object>() {
                ...

 

SessionCallback 인터페이스는 execute라는 메서드를 반드시 구현해야합니다.

execute 메서드 안에 실행하고 싶은 redis 명령어를 적습니다.

@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {

    // 여기서 operations는 파이프라인 모드로 동작하는 프록시 객체입니다.
    for (int i = 0; i < 10000; i++) {
        String key = "pipeline:" + i;
        String value = "value" + i;

        // 이 명령어는 즉시 실행되지 않고 버퍼에 쌓입니다.
        // 따라서, 실제 Redis에는 아직 아무것도 전송되지 않았습니다.
        operations.opsForValue().set((K) key, (V) value);
    }

    return null;
}

 

execute 메서드의 파라미터인 RedisOperations operations는 프록시로 redis에 보낼 명령어들을 operations에 담아 한꺼번에 보내는 역할을 합니다. 

RedisOperations는 이름에서 알 수 있다시피 다양한 redis를 명령어를 실행할 수 있는 함수들이 포함되어 있습니다. 

 

RedisOperations에 대해서 더 궁금하다면 아래 공식문서를 참고하시길 바랍니다.

https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/core/RedisOperations.html

 

RedisOperations (Spring Data Redis 4.0.1 API)

Interface that specified a basic set of Redis operations, implemented by RedisTemplate. Not often used but a useful option for extensibility and testability (as it can be easily mocked or stubbed). Redis command methods are exempted from the default non-nu

docs.spring.io

 

따라서 execute 메서드는 작성된 operation의 redis 명령어들을 queue에 차례차례 담고 작업이 끝나면 redis에 한꺼번에 보냅니다. 

 

마지막에 null을 리턴하는 이유는 pipeline 내부에서는 개별 명령어 각각이 바로 실행되지 않기 때문에 응답을 바로 받을 수 없기 때문입니다. 따라서 모든 응답은 나중에 List<Object> 타입의 results변수에 담겨서 받게 됩니다.

List<Object> results = redisTemplate.executePipelined( ...

 

현재 코드에서는 set 명령어만 작성했으니 results에 담겨져 돌아오는게 없겠지만 get을 할 경우에는 응답 결과가 results에 담겨져서 돌아옵니다.

따라서 실행할 명령어를 차례차례 queue에 담고 redis에 보내는 execute 함수에서는 반환할 값이 없으므로 null을 리턴합니다.

 

위 코드를 합쳐서 람다 형식으로 작성하면 조금 더 가독성 있게 작성할 수 있습니다.

List<Object> results = redisTemplate.executePipelined((RedisOperations operations) -> {
    for (int i = 0; i < 10000; i++) {
        operations.opsForValue().set("pipeline:" + i, "value" + i);
    }
    return null;
});

 

작성한 코드를 실행하기 위해 메인 함수에 다음과 같이 작성합니다. 

CommandLineRunner 클래스를 implements합니다. 따라서 run 메서드를 오버라이드하여 앞서 작성했던 함수들을 스프링 부트 구동 시에 호출합니다.

@SpringBootApplication
public class RedisPipelineApplication implements CommandLineRunner {

    @Autowired
    private RedisPipelineService redisPipelineService;

    public static void main(String[] args) {
        SpringApplication.run(RedisPipelineApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("============= [Redis 실습 시작] =============");

        // 1. 일반 방식 테스트
        System.out.println("1. 일반 방식(No Pipeline) 실행 중...");
        redisPipelineService.insertWithoutPipeline();

        System.out.println("-------------------------------------------");

        // 2. 파이프라인 방식 테스트
        System.out.println("2. 파이프라인(Pipeline) 방식 실행 중...");
        redisPipelineService.insertWithPipeline();

        System.out.println("============= [Redis 실습 종료] =============");
    }

}

 

CommandLineRunner에 대해서 더 궁금하다면 아래 블로그를 참고하시길 바랍니다.

https://www.daleseo.com/spring-boot-runners/

 

스프링 부트 구동 시점에 특정 코드 실행 시키기 (CommandLineRunner & ApplicationRunner)

Engineering Blog by Dale Seo

www.daleseo.com

 

테스트 결과 파이프라인을 이용한 방법이 일반적인 방법보다 2.5303382초 빠르며 약 5배 성능이 향상되었음을 확인할 수 있습니다. 실습 과정에서는 host를 localhost로 설정하고 테스트 했음에도 약간의 성능 향상을 관찰할 수 있었지만 local이 아닌 AWS ElasticCache와 같은 서비스를 이용 중일때 pipelining을 이용한다면 더욱 큰 성능 향상의 효과를 기대할 수 있습니다. 

 

아래는 실습 코드 github 주소입니다. 

https://github.com/YUBIN-githubb/RedisPipeline.git

 

GitHub - YUBIN-githubb/RedisPipeline

Contribute to YUBIN-githubb/RedisPipeline development by creating an account on GitHub.

github.com

 

 

아래는 Redis pipelining을 공부하며 참고했던 공식문서와 블로그 글입니다.

https://docs.spring.io/spring-data/redis/reference/redis/pipelining.html

 

Pipelining :: Spring Data Redis

The Lettuce driver supports fine-grained flush control that allows to either flush commands as they appear, buffer or send them at connection close. LettuceConnectionFactory factory = // ... factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3

docs.spring.io

https://junghyungil.tistory.com/220

 

[Redis] Redis Pipeline

Redis Pipeline에 설명하기에 앞서 No Pipeline에 관해 간략하 말씀드리겠습니다. redis는 요청을 보낼 때 일반적으로 아래와 같이 수행됩니다. 클라이언트는 서버에 쿼리를 보내고 일반적으로 차단 방

junghyungil.tistory.com

https://velog.io/@horang12/Redis-pipeline%EC%9D%84-%EC%9D%B4%EC%9A%A9%ED%95%B4%EB%B3%B4%EC%9E%90

 

Redis Pipeline을 이용해보자(+WireShark로 확인해보자)

이 메서드를 보면 레디스에 요청을 보내고, 응답을 받고 이 과정이 여러번에 걸쳐서 반복된다. 레디스 로그를 보면 매칭 프로필을 조회를 하는데 한번에 5번이나 요청이 오는 것을 볼 수 있다.

velog.io

https://icemelon404.tistory.com/25

 

Redis pipelining (feat. Spring + Lettuce)

Redis piplining 의 장점과 구현 방법 등을 설명하기 전에, plain 한 request/response 모델을 먼저 살펴보겠습니다. 클라이언트는 쿼리를 보내고 blocking 하게 응답을 기다립니다. ack 를 받으면 다시 다음 쿼

icemelon404.tistory.com

https://aws-hyoh.tistory.com/57

 

TCP/IP 쉽게 이해하기

IT 분야에서 실무를 담당하시는 분들뿐만 아니라 학생, IT 쪽에 조금이라도 관심이 있는 분들이라면 TCP/IP에 대해 들어보셨을 겁니다. 저 또한 학부시절에 TCP/IP에 대해서 여러 번 들어보았는데요.

aws-hyoh.tistory.com

https://redis.io/docs/latest/develop/using-commands/pipelining/

 

Redis pipelining

How to optimize round-trip times by batching Redis commands

redis.io

https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/core/RedisOperations.html

 

RedisOperations (Spring Data Redis 4.0.1 API)

Interface that specified a basic set of Redis operations, implemented by RedisTemplate. Not often used but a useful option for extensibility and testability (as it can be easily mocked or stubbed). Redis command methods are exempted from the default non-nu

docs.spring.io

https://www.daleseo.com/spring-boot-runners/

 

스프링 부트 구동 시점에 특정 코드 실행 시키기 (CommandLineRunner & ApplicationRunner)

Engineering Blog by Dale Seo

www.daleseo.com

 

'DataBase' 카테고리의 다른 글

[Database] MongoDB 연동하기  (5) 2025.07.09
[DataBase] DeadLock 해결하기, JPA 쿼리 실행 순서  (0) 2025.04.18