Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/supabase/lib/src/supabase_query_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {

/// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream].
///
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup).
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/subscribing-to-database-changes#enable-postgres-changes).
///
/// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates.
///
/// You may pass an optional [channelConfig] to configure the realtime channel to e.g., make it private.
///
/// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed.
///
/// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream.
Expand All @@ -43,7 +45,10 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
/// ```dart
/// supabase.from('chats').stream(primaryKey: ['id']).eq('room_id','123').order('created_at').limit(20).listen(_onChatsReceived);
/// ```
SupabaseStreamFilterBuilder stream({required List<String> primaryKey}) {
SupabaseStreamFilterBuilder stream({
required List<String> primaryKey,
RealtimeChannelConfig? channelConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think is better to add a new config type specific for configuring stream which contains only the private field, since the other ones doesn't apply here, and may confuse users.

Something like StreamChannelConfig.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sounds valid as well. But since there is just one option for the moment I would propose to use just 'private' as an additional argument to 'primaryKey' and leave out the additional class. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that works too

}) {
Comment thread
Vinzent03 marked this conversation as resolved.
assert(primaryKey.isNotEmpty, 'Please specify primary key column(s).');
return SupabaseStreamFilterBuilder(
queryBuilder: this,
Expand All @@ -52,6 +57,7 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
schema: _schema,
table: _table,
primaryKey: primaryKey,
channelConfig: channelConfig,
);
}
}
16 changes: 14 additions & 2 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

final String _realtimeTopic;

/// Realtime channel config for the stream.
///
/// Currently, only the [RealtimeChannelConfig.private] option affects the
/// `stream` method, since the `stream` method only handles postgres changes.
///
Comment thread
Vinzent03 marked this conversation as resolved.
Outdated
/// Defaults to the constructor of [RealtimeChannelConfig] with its respective
/// default values, which means the channel will be a public channel by
/// default.
final RealtimeChannelConfig _channelConfig;

RealtimeChannel? _channel;

final String _schema;
Expand Down Expand Up @@ -89,12 +99,14 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
required String schema,
required String table,
required List<String> primaryKey,
RealtimeChannelConfig? channelConfig,
}) : _queryBuilder = queryBuilder,
_realtimeTopic = realtimeTopic,
_realtimeClient = realtimeClient,
_schema = schema,
_table = table,
_uniqueColumns = primaryKey;
_uniqueColumns = primaryKey,
_channelConfig = channelConfig ?? const RealtimeChannelConfig();

/// Orders the result with the specified [column].
///
Expand Down Expand Up @@ -167,7 +179,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
);
}

_channel = _realtimeClient.channel(_realtimeTopic);
_channel = _realtimeClient.channel(_realtimeTopic, _channelConfig);

_channel!
.onPostgresChanges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder {
required super.schema,
required super.table,
required super.primaryKey,
super.channelConfig,
});

/// Filters the results where [column] equals [value].
Expand Down
30 changes: 29 additions & 1 deletion packages/supabase/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void main() {
Future<void> handleRequests(
HttpServer server, {
String? expectedFilter,
bool? expectedPrivate,
}) async {
await for (final HttpRequest request in server) {
final headers = request.headers;
Expand Down Expand Up @@ -113,8 +114,9 @@ void main() {
final requestJson = jsonDecode(request);
final topic = requestJson['topic'];
final ref = requestJson["ref"];
final event = requestJson['event'];

if (requestJson["event"] == "phx_leave") {
if (event == 'phx_leave') {
listeners.remove(topic);
return;
}
Expand All @@ -126,10 +128,15 @@ void main() {
final String? realtimeFilter = requestJson['payload']['config']
['postgres_changes']
.first['filter'];
final bool isPrivate =
requestJson['payload']['config']['private'] as bool;

if (expectedFilter != null) {
expect(realtimeFilter, expectedFilter);
}
if (expectedPrivate != null) {
expect(isPrivate, expectedPrivate);
}

final replyString = jsonEncode({
'event': 'phx_reply',
Expand Down Expand Up @@ -682,6 +689,27 @@ void main() {
});
});

group('stream() channel config', () {
test('forwards channelConfig.private=true to realtime join payload', () {
handleRequests(mockServer, expectedPrivate: true);

final stream = supabase.from('todos').stream(
primaryKey: ['id'],
channelConfig: const RealtimeChannelConfig(private: true),
);

expect(stream, emits(isList));
});

test('uses default private=false when channelConfig is omitted', () {
handleRequests(mockServer, expectedPrivate: false);

final stream = supabase.from('todos').stream(primaryKey: ['id']);

expect(stream, emits(isList));
});
});

group('Deprecated execute method', () {
test('should work with deprecated execute method', () {
handleRequests(mockServer);
Expand Down
Loading