Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BOSS indexing in DBGSuccinct + make RowDiff independent #484

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f2e15e4
Add row_diff_traverse, row_diff_successor
adamant-pwn May 23, 2024
f3fb0b0
Update, propagate to DBGSuccinct
adamant-pwn Jul 2, 2024
d7878bf
Use graph::DeBruijnGraph in build_pred_succ and assign_anchors
adamant-pwn Oct 7, 2024
3da66a9
Update download-artifact
adamant-pwn Oct 8, 2024
3a16332
Apply suggestions from code review
adamant-pwn Oct 8, 2024
76b29b7
Merge master to rowdiff (#504)
adamant-pwn Oct 8, 2024
3a9f1c6
Try simplifying build_pred_succ, temporarily rollback to graph.get_la…
adamant-pwn Oct 8, 2024
b18d463
Special handling of last.size()
adamant-pwn Oct 8, 2024
629cb09
Use last instead of rd_succ if it's empty
adamant-pwn Oct 9, 2024
71c19a1
Add checks to fix integration tests
adamant-pwn Oct 9, 2024
90abc08
Use BOSS index space in DBGSuccinct
adamant-pwn Oct 9, 2024
f019a05
override final for call_nodes + add select/rank node + some fixes
adamant-pwn Oct 9, 2024
358e445
num_nodes() -> max_index() for dbg_succ_
adamant-pwn Oct 9, 2024
1068116
Add is_valid checks to nodes in DBGSuccinct
adamant-pwn Oct 9, 2024
42707ff
Fix DBGSuccinct tests
adamant-pwn Oct 9, 2024
a49f07f
Fix AnnotatedDBG test group
adamant-pwn Oct 9, 2024
68dd5f5
Fix RowDiff tests
adamant-pwn Oct 10, 2024
ee0c4a8
Return npos in certain callbacks for dummy nodes
adamant-pwn Oct 10, 2024
a46f6a4
Validate edges on BOSS edge -> DBG node transition
adamant-pwn Oct 10, 2024
60851da
More validate_edge checks
adamant-pwn Oct 10, 2024
91ea56f
Use is_valid in adjacent_outgoing_rc_strand
adamant-pwn Oct 10, 2024
1976be1
Fix identation + annotations without succ
adamant-pwn Oct 10, 2024
4e971da
Fix RowDiff test
adamant-pwn Oct 10, 2024
7737e26
Move get_last, row_diff_traverse, row_diff_successor into row_diff_bu…
adamant-pwn Oct 10, 2024
04740f1
Preserve lifetime of get_last
adamant-pwn Oct 10, 2024
e2347d6
Fix integration tests
adamant-pwn Oct 15, 2024
94e9f90
Fix integration tests + return dict in _get_stats
adamant-pwn Oct 15, 2024
3bc714d
Apply review suggestions
adamant-pwn Nov 4, 2024
3a87651
Use valid_edges_->call_ones
adamant-pwn Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ on:
tags:
- 'v*'
pull_request:
branches:
- master

env:
REGISTRY: ghcr.io
Expand Down Expand Up @@ -213,7 +211,7 @@ jobs:
python-version: 3.8

- name: fetch static binary
uses: actions/download-artifact@v2
uses: actions/download-artifact@v4
with:
path: artifacts

Expand Down Expand Up @@ -285,7 +283,7 @@ jobs:
run: git submodule update --init --recursive

- name: fetch static binary
uses: actions/download-artifact@v2
uses: actions/download-artifact@v4
with:
path: artifacts

Expand Down
6 changes: 4 additions & 2 deletions metagraph/src/annotation/annotation_converters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <tsl/hopscotch_map.h>

#include "row_diff_builder.hpp"
#include "cli/load/load_graph.hpp"
#include "common/logger.hpp"
#include "common/algorithms.hpp"
#include "common/hashers/hash.hpp"
Expand Down Expand Up @@ -1539,12 +1540,13 @@ void convert_to_row_diff(const std::vector<std::string> &files,
if (out_dir.empty())
out_dir = "./";

auto graph = cli::load_critical_dbg(graph_fname);
if (construction_stage != RowDiffStage::COUNT_LABELS)
build_pred_succ(graph_fname, graph_fname, out_dir,
build_pred_succ(*graph, graph_fname, out_dir,
".row_count", get_num_threads());

if (construction_stage == RowDiffStage::CONVERT) {
assign_anchors(graph_fname, graph_fname, out_dir, max_path_length,
assign_anchors(*graph, graph_fname, out_dir, max_path_length,
".row_reduction", get_num_threads());

const std::string anchors_fname = graph_fname + kRowDiffAnchorExt;
Expand Down
125 changes: 73 additions & 52 deletions metagraph/src/annotation/row_diff_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,25 @@ void sum_and_call_counts(const fs::path &dir,
}
}

rd_succ_bv_type route_at_forks(const graph::DBGSuccinct &graph,
uint64_t from_graph_index(const graph::DeBruijnGraph &graph,
graph::DeBruijnGraph::node_index idx) {
if (auto* g = dynamic_cast<graph::DBGSuccinct const*>(&graph)) {
return g->kmer_to_boss_index(idx);
} else {
return idx;
}
}

graph::DeBruijnGraph::node_index to_graph_index(const graph::DeBruijnGraph &graph,
uint64_t idx) {
if (auto* g = dynamic_cast<graph::DBGSuccinct const*>(&graph)) {
return g->boss_to_kmer_index(idx);
} else {
return idx;
}
}

rd_succ_bv_type route_at_forks(const graph::DeBruijnGraph &graph,
const std::string &rd_succ_filename,
const std::string &count_vectors_dir,
const std::string &row_count_extension) {
Expand All @@ -282,7 +300,7 @@ rd_succ_bv_type route_at_forks(const graph::DBGSuccinct &graph,
logger->trace("RowDiff successors will be set to the adjacent nodes with"
" the largest number of labels");

const bit_vector &last = graph.get_boss().get_last();
const bit_vector &last = *graph.get_last();
hmusta marked this conversation as resolved.
Show resolved Hide resolved
graph::DeBruijnGraph::node_index graph_idx = to_node(0);

std::vector<uint32_t> outgoing_counts;
Expand All @@ -293,12 +311,12 @@ rd_succ_bv_type route_at_forks(const graph::DBGSuccinct &graph,
[&](int32_t count) {
// TODO: skip single outgoing
outgoing_counts.push_back(count);
if (last[graph.kmer_to_boss_index(graph_idx)]) {
if (last[from_graph_index(graph, graph_idx)]) {
// pick the node with the largest count
size_t max_pos = std::max_element(outgoing_counts.rbegin(),
outgoing_counts.rend())
- outgoing_counts.rbegin();
rd_succ_bv[graph.kmer_to_boss_index(graph_idx - max_pos)] = true;
rd_succ_bv[from_graph_index(graph, graph_idx - max_pos)] = true;
outgoing_counts.resize(0);
}
graph_idx++;
adamant-pwn marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -326,7 +344,7 @@ rd_succ_bv_type route_at_forks(const graph::DBGSuccinct &graph,
return rd_succ;
}

void build_pred_succ(const std::string &graph_fname,
void build_pred_succ(const graph::DeBruijnGraph &graph,
const std::string &outfbase,
const std::string &count_vectors_dir,
const std::string &row_count_extension,
Expand All @@ -342,21 +360,16 @@ void build_pred_succ(const std::string &graph_fname,
logger->trace("Building and writing successor and predecessor files to {}.*",
outfbase);

graph::DBGSuccinct graph(2);
logger->trace("Loading graph...");
if (!graph.load(graph_fname)) {
logger->error("Cannot load graph from {}", graph_fname);
std::exit(1);
std::optional<sdsl::bit_vector> dummy;
auto* succinct = dynamic_cast<graph::DBGSuccinct const*>(&graph);
if (succinct) {
dummy = succinct->get_boss().mark_all_dummy_edges(num_threads);
}

// assign row-diff successors at forks
rd_succ_bv_type rd_succ = route_at_forks(graph, outfbase + kRowDiffForkSuccExt,
count_vectors_dir, row_count_extension);

const BOSS &boss = graph.get_boss();

sdsl::bit_vector dummy = boss.mark_all_dummy_edges(num_threads);

// create the succ/pred files, indexed using annotation indices
uint32_t width = sdsl::bits::hi(graph.num_nodes()) + 1;
sdsl::int_vector_buffer<> succ(outfbase + ".succ", std::ios::out, BUFFER_SIZE, width);
Expand All @@ -368,7 +381,7 @@ void build_pred_succ(const std::string &graph_fname,
!common::get_verbose());

const uint64_t BS = 1'000'000;
// traverse BOSS table in parallel processing blocks of size |BS|
// traverse graph in parallel processing blocks of size |BS|
// use static scheduling to make threads process ordered contiguous blocks
#pragma omp parallel for ordered num_threads(num_threads) schedule(dynamic)
for (uint64_t start = 1; start <= graph.num_nodes(); start += BS) {
Expand All @@ -378,34 +391,49 @@ void build_pred_succ(const std::string &graph_fname,
std::vector<bool> pred_boundary_buf;

for (uint64_t i = start; i < std::min(start + BS, graph.num_nodes() + 1); ++i) {
BOSS::edge_index boss_idx = graph.kmer_to_boss_index(i);
if (!dummy[boss_idx]) {
BOSS::edge_index next = boss.fwd(boss_idx);
assert(next);
if (!dummy[next]) {
while (rd_succ.size() && !rd_succ[next]) {
next--;
assert(!boss.get_last(next));
if (succinct) { // Legacy code for DBGSuccinct
const BOSS &boss = succinct->get_boss();
BOSS::edge_index boss_idx = succinct->kmer_to_boss_index(i);
adamant-pwn marked this conversation as resolved.
Show resolved Hide resolved
if (!(*dummy)[boss_idx]) {
BOSS::edge_index next = boss.fwd(boss_idx);
assert(next);
if (!(*dummy)[next]) {
while (rd_succ.size() && !rd_succ[next]) {
next--;
assert(!boss.get_last(next));
}
succ_buf.push_back(to_row(succinct->boss_to_kmer_index(next)));
succ_boundary_buf.push_back(0);
}
succ_buf.push_back(to_row(graph.boss_to_kmer_index(next)));
succ_boundary_buf.push_back(0);
}
// compute predecessors only for row-diff successors
if (rd_succ.size() ? rd_succ[boss_idx] : boss.get_last(boss_idx)) {
BOSS::TAlphabet d = boss.get_node_last_value(boss_idx);
BOSS::edge_index back_idx = boss.bwd(boss_idx);
boss.call_incoming_to_target(back_idx, d,
[&](BOSS::edge_index pred) {
// dummy predecessors are ignored
if (!dummy[pred]) {
uint64_t node_index = graph.boss_to_kmer_index(pred);
pred_buf.push_back(to_row(node_index));
pred_boundary_buf.push_back(0);
// compute predecessors only for row-diff successors
if (rd_succ.size() ? rd_succ[boss_idx] : boss.get_last(boss_idx)) {
BOSS::TAlphabet d = boss.get_node_last_value(boss_idx);
BOSS::edge_index back_idx = boss.bwd(boss_idx);
boss.call_incoming_to_target(back_idx, d,
[&](BOSS::edge_index pred) {
// dummy predecessors are ignored
if (!(*dummy)[pred]) {
uint64_t node_index = succinct->boss_to_kmer_index(pred);
pred_buf.push_back(to_row(node_index));
pred_boundary_buf.push_back(0);
}
}
}
);
);
}
}
adamant-pwn marked this conversation as resolved.
Show resolved Hide resolved
} else {
auto j = graph.row_diff_successor(i, rd_succ);
succ_buf.push_back(to_row(j));
succ_boundary_buf.push_back(0);

if(rd_succ[i]) {
graph.adjacent_incoming_nodes(i, [&](auto pred) {
pred_buf.push_back(to_row(pred));
pred_boundary_buf.push_back(0);
});
}
}

succ_boundary_buf.push_back(1);
pred_boundary_buf.push_back(1);
++progress_bar;
Expand All @@ -424,7 +452,7 @@ void build_pred_succ(const std::string &graph_fname,
logger->trace("Pred/succ nodes written to {}.pred/succ", outfbase);
}

void assign_anchors(const std::string &graph_fname,
void assign_anchors(const graph::DeBruijnGraph &graph,
const std::string &outfbase,
const std::filesystem::path &count_vectors_dir,
uint32_t max_length,
Expand All @@ -436,13 +464,6 @@ void assign_anchors(const std::string &graph_fname,
return;
}

graph::DBGSuccinct graph(2);
logger->trace("Loading graph...");
if (!graph.load(graph_fname)) {
logger->error("Cannot load graph from {}", graph_fname);
std::exit(1);
}
const BOSS &boss = graph.get_boss();
const uint64_t num_rows = graph.num_nodes();

bool optimize_anchors = false;
Expand All @@ -451,7 +472,7 @@ void assign_anchors(const std::string &graph_fname,
optimize_anchors = true;
}

sdsl::bit_vector anchors_bv(boss.get_last().size(), false);
sdsl::bit_vector anchors_bv(graph.get_last()->size(), false);
adamant-pwn marked this conversation as resolved.
Show resolved Hide resolved

if (optimize_anchors) {
logger->trace("Making every row with negative reduction an anchor...");
Expand All @@ -461,7 +482,7 @@ void assign_anchors(const std::string &graph_fname,
[&](int32_t count) {
// check if the reduction is negative
if (count < 0)
anchors_bv[graph.kmer_to_boss_index(to_node(i))] = true;
anchors_bv[from_graph_index(graph, to_node(i))] = true;
i++;
}
);
Expand Down Expand Up @@ -492,11 +513,11 @@ void assign_anchors(const std::string &graph_fname,

if (rd_succ.size()) {
logger->trace("Assigning anchors for RowDiff successors {}...", rd_succ_fname);
boss.row_diff_traverse(num_threads, max_length, rd_succ, &anchors_bv);
graph.row_diff_traverse(num_threads, max_length, rd_succ, &anchors_bv);
} else {
logger->warn("Assigning anchors without chosen RowDiff successors."
" The last outgoing edges will be used for routing.");
boss.row_diff_traverse(num_threads, max_length, boss.get_last(), &anchors_bv);
graph.row_diff_traverse(num_threads, max_length, *graph.get_last(), &anchors_bv);
}
}

Expand All @@ -505,7 +526,7 @@ void assign_anchors(const std::string &graph_fname,
sdsl::bit_vector anchors(num_rows, false);
for (BOSS::edge_index i = 1; i < anchors_bv.size(); ++i) {
if (anchors_bv[i]) {
uint64_t graph_idx = graph.boss_to_kmer_index(i);
uint64_t graph_idx = to_graph_index(graph, i);
assert(to_row(graph_idx) < num_rows);
anchors[to_row(graph_idx)] = 1;
}
Expand Down
4 changes: 2 additions & 2 deletions metagraph/src/annotation/row_diff_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ void count_labels_per_row(const std::vector<std::string> &source_files,
const std::string &row_count_fname,
bool with_coordinates = false);

void build_pred_succ(const std::string &graph_filename,
void build_pred_succ(const graph::DeBruijnGraph &graph,
const std::string &outfbase,
const std::string &count_vectors_dir,
const std::string &row_count_extension,
uint32_t num_threads);

void assign_anchors(const std::string &graph_filename,
void assign_anchors(const graph::DeBruijnGraph &graph,
const std::string &outfbase,
const std::filesystem::path &dest_dir,
uint32_t max_length,
Expand Down
66 changes: 66 additions & 0 deletions metagraph/src/graph/representation/base/sequence_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sdsl/int_vector.hpp>

#include "common/logger.hpp"
#include "common/vectors/bit_vector_dyn.hpp"
#include "common/seq_tools/reverse_complement.hpp"
#include "common/threads/threading.hpp"
#include "common/vectors/vector_algorithm.hpp"
Expand Down Expand Up @@ -420,6 +421,71 @@ void DeBruijnGraph::call_unitigs(const CallPath &callback,
::mtg::graph::call_sequences(*this, callback, num_threads, true, min_tip_size, kmers_in_single_form);
}

std::shared_ptr<const bit_vector> DeBruijnGraph::get_last() const {
bit_vector_dyn last_bv(max_index() + 1);
call_nodes([&](node_index v) {
std::pair<char, node_index> last;
call_outgoing_kmers(v, [&](node_index u, char c) {
last = std::max(last, std::pair{c, u});
});
last_bv.set(last.second, true);
});
return std::make_shared<bit_vector_dyn>(std::move(last_bv));
}

void DeBruijnGraph::row_diff_traverse(size_t num_threads,
size_t max_length,
const bit_vector &rd_succ,
sdsl::bit_vector *terminal) const {
sdsl::bit_vector visited(max_index() + 1);
auto finalised = visited;
std::vector<size_t> distance(max_index() + 1);
assert(terminal->size() == visited.size());
assert(rd_succ.size() == visited.size());
auto set_terminal = [&](int v) {
distance[v] = 0;
(*terminal)[v] = true;
};
call_nodes([&](node_index v) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we know this works, can we add num_threads as a parameter to call_nodes so we can do this with multiple threads?

The set_bit, fetch_bit, etc. methods used in BOSS::row_diff_traverse can be used here for thread-safe atomic operations on sdsl::bit_vector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, might be best to get back to it after some initial version is landed.

static std::stack<node_index> path;
while (!visited[v]) {
path.push(v);
visited[v] = true;
if (!has_no_outgoing(v)) {
v = row_diff_successor(v, rd_succ);
}
}
// Either a sink, or a cyclic dependency
if (!finalised[v]) {
set_terminal(v);
finalised[v] = true;
}
node_index succ;
while (!empty(path)) {
succ = std::exchange(v, path.top());
if (!finalised[v]) {
distance[v] = distance[succ] + 1;
if (distance[v] == max_length) {
set_terminal(v);
}
finalised[v] = true;
}
path.pop();
}
});
}

node_index DeBruijnGraph::row_diff_successor(node_index node, const bit_vector &rd_succ) const {
node_index succ = npos;
adjacent_outgoing_nodes(node, [&](node_index adjacent_node) {
if(rd_succ[adjacent_node]) {
succ = adjacent_node;
}
});
assert(succ != npos && "a row diff successor must exist");
return succ;
}

/**
* Traverse graph and iterate over all nodes
*/
Expand Down
Loading
Loading