Skip to content

Commit

Permalink
Support backups of DSE with Search
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Dec 8, 2023
1 parent 4ddaeeb commit 4d5efbe
Show file tree
Hide file tree
Showing 27 changed files with 1,676 additions and 79 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,15 @@ jobs:
- uses: codecov/codecov-action@v1
name: Report code coverage

k8ssandra-e2e-tests:
needs: [build]
runs-on: ubuntu-latest
strategy:
matrix:
e2e_test:
- CreateSingleMedusaJob
- CreateSingleDseSearchDatacenterCluster
fail-fast: false
name: k8ssandra-${{ matrix.e2e_test }}
env:
Expand Down Expand Up @@ -343,7 +344,7 @@ jobs:
- uses: actions/checkout@v3
with:
repository: k8ssandra/k8ssandra-operator
ref: main
ref: radovan/ec2-dse-medusa
path: k8ssandra-operator
- name: Set up Go
uses: actions/setup-go@v3
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ debian/cassandra-medusa/
.coverage
coverage.xml
pytest.ini
.python-version
.python-version
tests/resources/dse/dse-*
19 changes: 18 additions & 1 deletion medusa/backup_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,16 @@ def start_backup(storage, node_backup, cassandra, differential_mode, stagger_tim

logging.info('Saving tokenmap and schema')
schema, tokenmap = get_schema_and_tokenmap(cassandra)

node_backup.schema = schema
node_backup.tokenmap = json.dumps(tokenmap)

logging.info('Saving server version')
server_type, release_version = get_server_type_and_version(cassandra)
node_backup.server_version = json.dumps({'server_type': server_type, 'release_version': release_version})

if differential_mode is True:
node_backup.differential = mode

add_backup_start_to_index(storage, node_backup)

if stagger_time:
Expand Down Expand Up @@ -274,6 +279,13 @@ def get_schema_and_tokenmap(cassandra):
return schema, tokenmap


@retry(stop_max_attempt_number=7, wait_exponential_multiplier=10000, wait_exponential_max=120000)
def get_server_type_and_version(cassandra):
with cassandra.new_session() as cql_session:
server_type, release_version = cql_session.get_server_type_and_release_version()
return server_type, release_version


def do_backup(cassandra, node_backup, storage, differential_mode, enable_md5_checks,
config, backup_name):
# Load last backup as a cache
Expand All @@ -294,6 +306,11 @@ def do_backup(cassandra, node_backup, storage, differential_mode, enable_md5_che
manifest = []
num_files = backup_snapshots(storage, manifest, node_backup, node_backup_cache, snapshot)

if node_backup.is_dse:
logging.info('Creating DSE snapshot')
with cassandra.create_dse_snapshot(backup_name) as snapshot:
num_files += backup_snapshots(storage, manifest, node_backup, node_backup_cache, snapshot)

logging.info('Updating backup index')
node_backup.manifest = json.dumps(manifest)
add_backup_finish_to_index(storage, node_backup)
Expand Down
88 changes: 87 additions & 1 deletion medusa/cassandra_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import pathlib
import shutil
import shlex
import socket
import subprocess
Expand Down Expand Up @@ -218,6 +219,17 @@ def schema_path_mapping(self):
def execute(self, query):
return self.session.execute(query)

def get_server_type_and_release_version(self):
server_type = 'cassandra'
release_version = 'None'
rows = self.session.execute("SELECT * FROM system.local")
for row in rows:
if hasattr(row, 'dse_version'):
server_type = 'dse'
if hasattr(row, 'release_version'):
release_version = row.release_version
return server_type, release_version


class CassandraConfigReader(object):
DEFAULT_CASSANDRA_CONFIG = '/etc/cassandra/cassandra.yaml'
Expand Down Expand Up @@ -329,6 +341,7 @@ def seeds(self):

class Cassandra(object):
SNAPSHOT_PATTERN = '*/*/snapshots/{}'
DSE_SNAPSHOT_PATTERN = '*/snapshots/{}'
SNAPSHOT_PREFIX = 'medusa-'

def __init__(self, config, contact_point=None, release_version=None):
Expand All @@ -342,6 +355,8 @@ def __init__(self, config, contact_point=None, release_version=None):
config_reader = CassandraConfigReader(cassandra_config.config_file, release_version)
self._cassandra_config_file = cassandra_config.config_file
self._root = config_reader.root
self._dse_root = self._root.parent
self._dse_metadata_folder = 'metadata'
self._commitlog_path = config_reader.commitlog_directory
self._saved_caches_path = config_reader.saved_caches_directory
self._hostname = contact_point if contact_point is not None else config_reader.listen_address
Expand Down Expand Up @@ -404,6 +419,69 @@ def rpc_port(self):
def release_version(self):
return self._release_version

@property
def dse_metadata_path(self):
return self._dse_root / self._dse_metadata_folder

@property
def dse_search_path(self):
# the DSE Search files are next to regular keyspace folders, but are not a real keyspace
return self._root / 'solr.data'

def rebuild_search_index(self):
logging.debug('Opening new session to restore DSE indexes')
with self._cql_session_provider.new_session() as session:
rows = session.execute("SELECT core_name FROM solr_admin.solr_resources")
fqtns_with_index = {r.core_name for r in rows}
for fqtn in fqtns_with_index:
logging.debug(f'Rebuilding search index for {fqtn}')
session.execute(f"REBUILD SEARCH INDEX ON {fqtn}")

def create_dse_snapshot(self, backup_name):
"""
There is no good way of making snapshot of DSE files
They are not SSTables, so we cannot just hard-link them and get immutable files to work with
So we have a poor-mans alternative of just copying them into a folder
That folder is nested in the parent folder, just like for regular tables
This way, we can reuse a lot of code later on
"""
tag = "{}{}".format(self.SNAPSHOT_PREFIX, backup_name)
if not self.dse_snapshot_exists(tag):
src_path = self._dse_root / self._dse_metadata_folder
dst_path = self._dse_root / self._dse_metadata_folder / 'snapshots' / tag
shutil.copytree(src_path, dst_path)

return Cassandra.DseSnapshot(self, tag)

class DseSnapshot(object):

def __init__(self, parent, tag):
self._parent = parent
self._tag = tag

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
logging.debug('Cleaning up DSE snapshot')
self.delete()

def find_dirs(self):
dse_folder = self._parent._dse_metadata_folder
return [
SnapshotPath(
pathlib.Path(self._parent._dse_root) / dse_folder / 'snapshots', 'dse', dse_folder
)
]

def delete(self):
dse_folder = self._parent._dse_metadata_folder
dse_folder_path = self._parent._dse_root / dse_folder / 'snapshots' / self._tag
shutil.rmtree(dse_folder_path)

def __repr__(self):
return '{}<{}>'.format(self.__class__.__qualname__, self._tag)

Check warning on line 483 in medusa/cassandra_utils.py

View check run for this annotation

Codecov / codecov/patch

medusa/cassandra_utils.py#L483

Added line #L483 was not covered by tests

class Snapshot(object):
def __init__(self, parent, tag):
self._parent = parent
Expand All @@ -413,7 +491,7 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
logging.debug('Cleaning up snapshot')
logging.debug('Cleaning up Cassandra snapshot')
self.delete()

@property
Expand Down Expand Up @@ -477,6 +555,14 @@ def snapshot_exists(self, tag):
return True
return False

def dse_snapshot_exists(self, tag):
# dse files live one directory up from the data folder
# the root field should point to the data directory as defined in the cassandra.yaml
for snapshot in self._dse_root.glob(self.DSE_SNAPSHOT_PATTERN.format('*')):
if snapshot.is_dir() and snapshot.name == tag:
return True

Check warning on line 563 in medusa/cassandra_utils.py

View check run for this annotation

Codecov / codecov/patch

medusa/cassandra_utils.py#L563

Added line #L563 was not covered by tests
return False

def create_snapshot_command(self, backup_name):
"""
:param backup_name: string name of the medusa backup
Expand Down
7 changes: 4 additions & 3 deletions medusa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
'CassandraConfig',
['start_cmd', 'stop_cmd', 'config_file', 'cql_username', 'cql_password', 'check_running', 'is_ccm',
'sstableloader_bin', 'nodetool_username', 'nodetool_password', 'nodetool_password_file_path', 'nodetool_host',
'nodetool_port', 'certfile', 'usercert', 'userkey', 'sstableloader_ts', 'sstableloader_tspw',
'sstableloader_ks', 'sstableloader_kspw', 'nodetool_ssl', 'resolve_ip_addresses', 'use_sudo', 'nodetool_flags',
'cql_k8s_secrets_path', 'nodetool_k8s_secrets_path']
'nodetool_executable', 'nodetool_port', 'certfile', 'usercert', 'userkey', 'sstableloader_ts',
'sstableloader_tspw', 'sstableloader_ks', 'sstableloader_kspw', 'nodetool_ssl', 'resolve_ip_addresses', 'use_sudo',
'nodetool_flags', 'cql_k8s_secrets_path', 'nodetool_k8s_secrets_path']
)

SSHConfig = collections.namedtuple(
Expand Down Expand Up @@ -136,6 +136,7 @@ def _build_default_config():
'sstableloader_bin': 'sstableloader',
'resolve_ip_addresses': 'True',
'use_sudo': 'True',
'nodetool_executable': 'nodetool',
'nodetool_flags': '-Dcom.sun.jndi.rmiURLParsing=legacy'
}

Expand Down
3 changes: 2 additions & 1 deletion medusa/nodetool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
class Nodetool(object):

def __init__(self, cassandra_config):
nodetool_executable = cassandra_config.nodetool_executable
nodetool_flags = cassandra_config.nodetool_flags.split(" ") if cassandra_config.nodetool_flags else []
self._nodetool = ['nodetool'] + nodetool_flags
self._nodetool = [nodetool_executable] + nodetool_flags
if cassandra_config.nodetool_ssl == "true":
self._nodetool += ['--ssl']
if cassandra_config.nodetool_username is not None:
Expand Down
23 changes: 19 additions & 4 deletions medusa/restore_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ def restore_node_locally(config, temp_dir, backup_name, in_place, keep_auth, see
# especially around system tables.
use_sudo = medusa.utils.evaluate_boolean(config.storage.use_sudo_for_restore)
clean_path(cassandra.commit_logs_path, use_sudo, keep_folder=True)
clean_path(cassandra.saved_caches_path, use_sudo, keep_folder=True)

if node_backup.is_dse:
clean_path(cassandra.dse_metadata_path, use_sudo, keep_folder=True)
clean_path(cassandra.dse_search_path, use_sudo, keep_folder=True)

# move backup data to Cassandra data directory according to system table
logging.info('Moving backup data to Cassandra data directory')
Expand Down Expand Up @@ -134,6 +137,12 @@ def restore_node_locally(config, temp_dir, backup_name, in_place, keep_auth, see
cassandra.start_with_implicit_token()
else:
cassandra.start(tokens)

# if we're restoring DSE, we need to explicitly trigger Search index rebuild
if node_backup.is_dse:
logging.info('Triggering DSE Search index rebuild')
cassandra.rebuild_search_index()

elif not in_place:
# Kubernetes will manage the lifecycle, but we still need to modify the tokens
cassandra.replace_tokens_in_cassandra_yaml_and_disable_bootstrap(tokens)
Expand Down Expand Up @@ -299,9 +308,15 @@ def maybe_restore_section(section, download_dir, cassandra_data_dir, in_place, k
logging.info('Keeping section {}.{} untouched'.format(section['keyspace'], section['columnfamily']))
return

src = download_dir / section['keyspace'] / section['columnfamily']
# not appending the column family name because mv later on copies the whole folder
dst = cassandra_data_dir / section['keyspace'] / section['columnfamily']
# the 'dse' is an arbitrary name we gave to folders that don't sit in the regular place for keyspaces
# this is mostly DSE internal files
if section['keyspace'] != 'dse':
src = download_dir / section['keyspace'] / section['columnfamily']
# not appending the column family name because mv later on copies the whole folder
dst = cassandra_data_dir / section['keyspace'] / section['columnfamily']
else:
src = download_dir / section['keyspace'] / section['columnfamily']
dst = cassandra_data_dir.parent / section['columnfamily']

# prepare the destination folder
if dst.exists():
Expand Down
22 changes: 22 additions & 0 deletions medusa/storage/abstract_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,28 @@ def get_object_datetime(self, blob: AbstractBlob) -> datetime.datetime:
def hashes_match(manifest_hash, object_hash):
return base64.b64decode(manifest_hash).hex() == str(object_hash) or manifest_hash == str(object_hash)

@staticmethod
def path_maybe_with_parent(dest: str, src_path: Path) -> str:
"""
Works out which path to download or upload a file into.
@param dest : path to a directory where we'll be placing the file into
@param src_path : full path of a file (or object) we are read
@returns : full path of the file or object we write
Medusa generally expects SSTables which reside in .../keyspace/table/ (this is where dest points to)
But in some cases, we have exceptions:
- secondary indexes are stored in whatever/data_folder/keyspace/table/.index_name/,
so we need to include the index name in the destination path
- DSE metadata in resides in whatever/metadata where there is a `nodes` folder only (DSE 6.8)
in principle, this is just like a 2i file structure, so we reuse all the other logic
"""
if src_path.parent.name.startswith(".") or src_path.parent.name.endswith('nodes'):
# secondary index file or a DSE metadata file
return "{}/{}/{}".format(dest, src_path.parent.name, src_path.name)
else:
# regular SSTable
return "{}/{}".format(dest, src_path.name)

def get_path_prefix(self, path=None) -> str:
return ""

Expand Down
16 changes: 4 additions & 12 deletions medusa/storage/azure_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ async def _download_blob(self, src: str, dest: str):
# we must make sure the blob gets stored under sub-folder (if there is any)
# the dest variable only points to the table folder, so we need to add the sub-folder
src_path = Path(src)
file_path = (
"{}/{}/{}".format(dest, src_path.parent.name, src_path.name)
if src_path.parent.name.startswith(".")
else "{}/{}".format(dest, src_path.name)
)
file_path = AbstractStorage.path_maybe_with_parent(dest, src_path)

if blob.size < int(self.config.multi_part_upload_threshold):
workers = 1
Expand All @@ -153,6 +149,7 @@ async def _download_blob(self, src: str, dest: str):
blob=object_key,
max_concurrency=workers,
)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
await downloader.readinto(open(file_path, "wb"))

async def _stat_blob(self, object_key: str) -> AbstractBlob:
Expand All @@ -172,15 +169,10 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
src_chunks = src.split('/')
parent_name, file_name = src_chunks[-2], src_chunks[-1]
src_path = Path(src)

# check if objects resides in a sub-folder (e.g. secondary index). if it does, use the sub-folder in object path
object_key = (
"{}/{}/{}".format(dest, parent_name, file_name)
if parent_name.startswith(".")
else "{}/{}".format(dest, file_name)
)
object_key = AbstractStorage.path_maybe_with_parent(dest, src_path)

file_size = os.stat(src).st_size
logging.debug(
Expand Down
16 changes: 4 additions & 12 deletions medusa/storage/google_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,7 @@ async def _download_blob(self, src: str, dest: str):
# we must make sure the blob gets stored under sub-folder (if there is any)
# the dest variable only points to the table folder, so we need to add the sub-folder
src_path = Path(src)
file_path = (
"{}/{}/{}".format(dest, src_path.parent.name, src_path.name)
if src_path.parent.name.startswith(".")
else "{}/{}".format(dest, src_path.name)
)
file_path = AbstractStorage.path_maybe_with_parent(dest, src_path)

logging.debug(
'[Storage] Downloading gcs://{}/{} -> {}'.format(
Expand All @@ -156,6 +152,7 @@ async def _download_blob(self, src: str, dest: str):
object_name=object_key,
timeout=-1,
)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as f:
while True:
chunk = await stream.read(DOWNLOAD_STREAM_CONSUMPTION_CHUNK_SIZE)
Expand Down Expand Up @@ -184,15 +181,10 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
src_chunks = src.split('/')
parent_name, file_name = src_chunks[-2], src_chunks[-1]
src_path = Path(src)

# check if objects resides in a sub-folder (e.g. secondary index). if it does, use the sub-folder in object path
object_key = (
"{}/{}/{}".format(dest, parent_name, file_name)
if parent_name.startswith(".")
else "{}/{}".format(dest, file_name)
)
object_key = AbstractStorage.path_maybe_with_parent(dest, src_path)

if src.startswith("gs"):
logging.debug(
Expand Down
Loading

0 comments on commit 4d5efbe

Please sign in to comment.