Cluster-to-Cluster Data Migration

GitHub

Migrate data between Elasticsearch clusters using Logstash — full index migration, field filtering, scheduled sync, and Kafka pipelines.

30m15m reading15m lab

Project Structure

📁data-migration-logstash
├── 📄docker-compose.yml
├── 📄.env
└── 📁pipeline
└── 📄logstash.conf

Migration Approaches

MethodBest ForComplexity
LogstashFiltered/transformed migrationMedium
Reindex APISame-version, simple copyLow
Remote ReindexCross-cluster, basic copyLow
Snapshot/RestoreFull cluster backup/restoreLow
This lesson focuses on Logstash for its flexibility. See Reindexing for the API approach.

Logstash Elasticsearch-to-Elasticsearch Pipeline

Docker Compose Setup

Download the migration compose file:

wget https://raw.githubusercontent.com/VibhuviOiO/infinite-containers/main/elastic-stack/docker-compose-data-migration.yml -O docker-compose.yml

Migration Pipeline

Create pipeline/migration.conf:

input {
  elasticsearch {
    hosts => ["${SOURCE_ES_HOST}:9200"]
    index => "${SOURCE_INDEX}"
    size => 1000
    scroll => "1m"
    docinfo => true
    docinfo_fields => ["_index", "_id"]
  }
}

filter {
  # Remove PII fields during migration
  mutate {
    remove_field => [
      "alternateEmails",
      "alternatePhoneNumbers",
      "email",
      "@version",
      "@timestamp"
    ]
  }
}

output {
  elasticsearch {
    hosts => ["${DEST_ES_HOST}:9200"]
    index => "${DEST_INDEX}"
    document_id => "%{[@metadata][_id]}"
    workers => 1
  }

  # Debug output (remove in production)
  stdout {
    codec => dots
  }
}

Environment Variables

# .env file
SOURCE_ES_HOST=source-elasticsearch
SOURCE_INDEX=my-data-*
DEST_ES_HOST=dest-elasticsearch
DEST_INDEX=migrated-data

Run the Migration

docker compose up -d

Scheduled Sync

For continuous synchronization between clusters, add a cron schedule:

input {
  elasticsearch {
    hosts => ["${SOURCE_ES_HOST}:9200"]
    index => "${SOURCE_INDEX}"
    size => 1000
    scroll => "1m"
    schedule => "*/5 * * * *"
    docinfo => true
    docinfo_fields => ["_index", "_id"]
    query => '{ "query": { "range": { "updated_at": { "gte": "now-6m" }}}}'
  }
}
This runs every 5 minutes and only fetches documents updated in the last 6 minutes (1-minute overlap for safety).

Migrating with Authentication

When both clusters have security enabled:

input {
  elasticsearch {
    hosts => ["https://source:9200"]
    index => "production-logs-*"
    user => "migration_reader"
    password => "${SOURCE_PASSWORD}"
    ssl_certificate_authorities => ["/certs/source-ca.crt"]
    size => 1000
    scroll => "5m"
  }
}

output {
  elasticsearch {
    hosts => ["https://dest:9200"]
    index => "imported-logs-%{+YYYY.MM.dd}"
    user => "migration_writer"
    password => "${DEST_PASSWORD}"
    ssl_certificate_authorities => ["/certs/dest-ca.crt"]
  }
}

Multi-Index Migration

Migrate multiple indices while preserving their names:

input {
  elasticsearch {
    hosts => ["source:9200"]
    index => "*"
    size => 1000
    scroll => "5m"
    docinfo => true
    docinfo_fields => ["_index", "_id"]
  }
}

filter {
  # Skip system indices
  if [@metadata][_index] =~ /^\..*/ {
    drop {}
  }
}

output {
  elasticsearch {
    hosts => ["dest:9200"]
    index => "%{[@metadata][_index]}"
    document_id => "%{[@metadata][_id]}"
  }
}

Data Transformation During Migration

Rename Fields

filter {
  mutate {
    rename => {
      "old_field_name" => "new_field_name"
      "user.email" => "contact_email"
    }
  }
}

Add Fields

filter {
  mutate {
    add_field => {
      "migrated_from" => "production-cluster"
      "migration_date" => "%{+YYYY-MM-dd}"
    }
  }
}

Filter by Date Range

input {
  elasticsearch {
    hosts => ["source:9200"]
    index => "logs-*"
    query => '{
      "query": {
        "range": {
          "@timestamp": {
            "gte": "2024-01-01",
            "lte": "2024-12-31"
          }
        }
      }
    }'
  }
}

Kafka-to-Elasticsearch Pipeline

For event-driven architectures, Logstash can consume from Kafka:

input {
  kafka {
    bootstrap_servers => "kafka-broker:9092"
    topics => ["application-events"]
    group_id => "es-consumer"
    codec => "json"
    consumer_threads => 3
    auto_offset_reset => "earliest"
  }
}

filter {
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "events-%{+YYYY.MM.dd}"
    user => "logstash_writer"
    password => "${ES_PASSWORD}"
  }
}

Migration Performance Tuning

Pipeline Settings

In logstash.yml or pipelines.yml:

- pipeline.id: migration
  path.config: "/pipeline/migration.conf"
  pipeline.workers: 3
  pipeline.batch.size: 1000
  pipeline.batch.delay: 50

Elasticsearch Input Tuning

SettingDefaultRecommendedEffect
size10005000Documents per scroll batch
scroll1m5mScroll context timeout
slices1autoParallel scroll slices

Elasticsearch Output Tuning

SettingDefaultRecommendedEffect
workers12-4Parallel output workers
flush_size5001000Bulk request size
idle_flush_time1s5sFlush interval

Monitoring Migration Progress

Check Logstash Pipeline Stats

curl -s "http://logstash:9600/_node/stats/pipelines?pretty"

Check Destination Index Count

# Compare source and destination counts
echo "Source:"
curl -s "http://source:9200/my-index/_count?pretty"

echo "Destination:"
curl -s "http://dest:9200/migrated-index/_count?pretty"

Migration Checklist

StepAction
1Verify source cluster health
2Check destination cluster has enough disk space
3Create index template on destination (mappings + settings)
4Test pipeline with a small index first
5Run full migration
6Compare document counts
7Spot-check sample documents
8Update application connection strings
9Monitor destination cluster performance

Lab: Migrate Data Between Clusters

  1. 1 Download the migration compose file
  2. 2 Set up source and destination clusters
  3. 3 Create a test index with sample data on the source
  4. 4 Configure the Logstash migration pipeline
  5. 5 Run the migration and verify document counts match
  6. 6 Add a filter to remove sensitive fields
  7. 7 Re-run and verify the fields are stripped

Next Steps

Discussion